Add will topic validation and acl check (#2075)
* Add will topic validation and acl check
This commit is contained in:
parent
938d30268a
commit
bf7f10ecd1
|
@ -290,7 +290,7 @@ process_packet(?CONNECT_PACKET(
|
|||
properties = ConnProps,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
password = Password} = Connect), PState) ->
|
||||
password = Password} = ConnPkt), PState) ->
|
||||
|
||||
NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),
|
||||
|
||||
|
@ -298,7 +298,6 @@ process_packet(?CONNECT_PACKET(
|
|||
|
||||
%% TODO: Mountpoint...
|
||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||
WillMsg = make_will_msg(Connect),
|
||||
|
||||
PState1 = set_username(Username,
|
||||
PState#pstate{client_id = NewClientId,
|
||||
|
@ -307,16 +306,16 @@ process_packet(?CONNECT_PACKET(
|
|||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
conn_props = ConnProps,
|
||||
will_msg = WillMsg,
|
||||
is_bridge = IsBridge,
|
||||
connected_at = os:timestamp()}),
|
||||
connack(
|
||||
case check_connect(Connect, PState1) of
|
||||
case check_connect(ConnPkt, PState1) of
|
||||
{ok, PState2} ->
|
||||
case authenticate(credentials(PState2), Password) of
|
||||
{ok, IsSuper} ->
|
||||
%% Maybe assign a clientId
|
||||
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
|
||||
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper,
|
||||
will_msg = make_will_msg(ConnPkt)}),
|
||||
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
|
||||
%% Open session
|
||||
case try_open_session(PState3) of
|
||||
|
@ -719,13 +718,15 @@ get_property(Name, Props, Default) ->
|
|||
maps:get(Name, Props, Default).
|
||||
|
||||
make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
|
||||
will_props = WillProps} = Connect) ->
|
||||
emqx_packet:will_msg(if
|
||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||
will_props = WillProps} = ConnPkt) ->
|
||||
emqx_packet:will_msg(
|
||||
case ProtoVer of
|
||||
?MQTT_PROTO_V5 ->
|
||||
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
|
||||
Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
|
||||
true ->
|
||||
Connect
|
||||
ConnPkt#mqtt_packet_connect{
|
||||
will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
|
||||
_ ->
|
||||
ConnPkt
|
||||
end).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -735,7 +736,8 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
|
|||
check_connect(Packet, PState) ->
|
||||
run_check_steps([fun check_proto_ver/2,
|
||||
fun check_client_id/2,
|
||||
fun check_banned/2], Packet, PState).
|
||||
fun check_banned/2,
|
||||
fun check_will_topic/2], Packet, PState).
|
||||
|
||||
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
|
||||
proto_name = Name}, _PState) ->
|
||||
|
@ -766,7 +768,7 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
|
|||
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
|
||||
end.
|
||||
|
||||
check_banned(_Connect, #pstate{enable_ban = false}) ->
|
||||
check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
|
||||
ok;
|
||||
check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
|
||||
#pstate{peername = Peername}) ->
|
||||
|
@ -777,6 +779,26 @@ check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
|
|||
false -> ok
|
||||
end.
|
||||
|
||||
check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
|
||||
ok;
|
||||
check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) ->
|
||||
try emqx_topic:validate(WillTopic) of
|
||||
true -> check_will_acl(ConnPkt, PState)
|
||||
catch error : _Error ->
|
||||
{error, ?RC_TOPIC_NAME_INVALID}
|
||||
end.
|
||||
|
||||
check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl})
|
||||
when not EnableAcl ->
|
||||
ok;
|
||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
|
||||
case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
|
||||
allow -> ok;
|
||||
deny ->
|
||||
?LOG(warning, "Cannot publish will message to ~p for acl checking failed", [WillTopic]),
|
||||
{error, ?RC_UNSPECIFIED_ERROR}
|
||||
end.
|
||||
|
||||
check_publish(Packet, PState) ->
|
||||
run_check_steps([fun check_pub_caps/2,
|
||||
fun check_pub_acl/2], Packet, PState).
|
||||
|
@ -902,25 +924,29 @@ flag(true) -> 1.
|
|||
%% Execute actions in case acl deny
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
|
||||
{error, ?RC_NOT_AUTHORIZED, PState};
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, ?RC_NOT_AUTHORIZED, PState};
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
|
||||
do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload),
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
|
||||
?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, ?RC_NOT_AUTHORIZED, PState};
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
|
||||
do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
|
||||
ReasonCodes, PState = #pstate{acl_deny_action = disconnect}) ->
|
||||
ReasonCodes, PState = #pstate{proto_ver = ProtoVer,
|
||||
acl_deny_action = disconnect}) ->
|
||||
case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
|
||||
true ->
|
||||
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
|
||||
{error, ?RC_NOT_AUTHORIZED, PState};
|
||||
{error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
|
||||
false ->
|
||||
{ok, PState}
|
||||
end;
|
||||
|
|
|
@ -133,6 +133,7 @@ compat(connack, 16#89) -> ?CONNACK_SERVER;
|
|||
compat(connack, 16#8A) -> ?CONNACK_AUTH;
|
||||
compat(connack, 16#8B) -> ?CONNACK_SERVER;
|
||||
compat(connack, 16#8C) -> ?CONNACK_AUTH;
|
||||
compat(connack, 16#90) -> ?CONNACK_SERVER;
|
||||
compat(connack, 16#97) -> ?CONNACK_SERVER;
|
||||
compat(connack, 16#9C) -> ?CONNACK_SERVER;
|
||||
compat(connack, 16#9D) -> ?CONNACK_SERVER;
|
||||
|
|
|
@ -35,13 +35,17 @@
|
|||
|
||||
all() ->
|
||||
[
|
||||
{group, mqtt_common},
|
||||
{group, mqttv4},
|
||||
{group, mqttv5},
|
||||
{group, acl}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[{mqttv4,
|
||||
[{mqtt_common,
|
||||
[sequence],
|
||||
[will_check]},
|
||||
{mqttv4,
|
||||
[sequence],
|
||||
[connect_v4,
|
||||
subscribe_v4]},
|
||||
|
@ -53,7 +57,6 @@ groups() ->
|
|||
[sequence],
|
||||
[acl_deny_action]}].
|
||||
|
||||
|
||||
init_per_suite(Config) ->
|
||||
[start_apps(App, SchemaFile, ConfigFile) ||
|
||||
{App, SchemaFile, ConfigFile}
|
||||
|
@ -436,6 +439,39 @@ acl_deny_action(_) ->
|
|||
emqx_zone:set_env(external, acl_deny_action, ignore),
|
||||
ok.
|
||||
|
||||
will_check(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
will_topic_check(0),
|
||||
will_acl_check(0).
|
||||
|
||||
will_topic_check(QoS) ->
|
||||
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
|
||||
{will_flag, true},
|
||||
{will_topic, <<"">>},
|
||||
{will_payload, <<"I have died">>},
|
||||
{will_qos, QoS}]),
|
||||
try emqx_client:connect(Client) of
|
||||
_ ->
|
||||
ok
|
||||
catch
|
||||
exit : _Reason ->
|
||||
false = is_process_alive(Client)
|
||||
end.
|
||||
|
||||
will_acl_check(QoS) ->
|
||||
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
|
||||
{will_flag, true},
|
||||
{will_topic, <<"acl_deny_action">>},
|
||||
{will_payload, <<"I have died">>},
|
||||
{will_qos, QoS}]),
|
||||
try emqx_client:connect(Client) of
|
||||
_ ->
|
||||
ok
|
||||
catch
|
||||
exit : _Reason ->
|
||||
false = is_process_alive(Client)
|
||||
end.
|
||||
|
||||
acl_deny_do_disconnect(publish, QoS, Topic) ->
|
||||
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
|
||||
{ok, _} = emqx_client:connect(Client),
|
||||
|
|
Loading…
Reference in New Issue