From d4932533ca1043b6f1d8d6b9b1fe22800ff5e6ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=A5=87=E6=80=AA?= Date: Fri, 10 Apr 2020 19:45:18 +0800 Subject: [PATCH] Auth packet (#3374) --- src/emqx_channel.erl | 120 ++++++++++++++++++++++++++++++------ src/emqx_reason_codes.erl | 1 + test/emqx_channel_SUITE.erl | 43 ++++++++++++- 3 files changed, 142 insertions(+), 22 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1a8839891..4490ca40f 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -71,6 +71,8 @@ topic_aliases :: emqx_types:topic_aliases(), %% MQTT Topic Alias Maximum alias_maximum :: maybe(map()), + %% Authentication Data Cache + auth_cache :: maybe(map()), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Conn State @@ -185,6 +187,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, topic_aliases = #{inbound => #{}, outbound => #{} }, + auth_cache = #{}, timers = #{}, conn_state = idle, takeover = false, @@ -216,10 +219,41 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun check_banned/2, fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of - {ok, NConnPkt, NChannel} -> - process_connect(NConnPkt, NChannel); + {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> + NChannel1 = NChannel#channel{ + will_msg = emqx_packet:will_msg(NConnPkt), + alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) + }, + case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of + {ok, Properties, NChannel2} -> + process_connect(Properties, ensure_connected(NChannel2)); + {continue, Properties, NChannel2} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2); + {error, ReasonCode, NChannel2} -> + handle_out(connack, ReasonCode, NChannel2) + end; {error, ReasonCode, NChannel} -> - handle_out(connack, {ReasonCode, ConnPkt}, NChannel) + handle_out(connack, ReasonCode, NChannel) + end; + +handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel) -> + case enhanced_auth(Packet, Channel) of + {ok, NProperties, NChannel} -> + process_connect(NProperties, ensure_connected(NChannel)); + {continue, NProperties, NChannel} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel); + {error, NReasonCode, NChannel} -> + handle_out(connack, NReasonCode, NChannel) + end; + +handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel) -> + case enhanced_auth(Packet, Channel) of + {ok, NProperties, NChannel} -> + handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel); + {continue, NProperties, NChannel} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel); + {error, NReasonCode, NChannel} -> + handle_out(disconnect, NReasonCode, NChannel) end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> @@ -362,24 +396,23 @@ handle_in(Packet, Channel) -> %% Process Connect %%-------------------------------------------------------------------- -process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, - Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> +process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) -> case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, - handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel); + handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel); {ok, #{session := Session, present := true, pendings := Pendings}} -> Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), NChannel = Channel#channel{session = Session, resuming = true, pendings = Pendings1 }, - handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel); + handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel); {error, client_id_unavailable} -> - handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel); + handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); {error, Reason} -> ?LOG(error, "Failed to open session due to ~p", [Reason]), - handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel) + handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel) end. %%-------------------------------------------------------------------- @@ -579,18 +612,17 @@ not_nacked({deliver, _Topic, Msg}) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}). -handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 - ], #{}, Channel), + ], Props, Channel), NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps), return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), - ensure_keepalive(NAckProps, - ensure_connected(ConnPkt, Channel))); + ensure_keepalive(NAckProps, Channel)); -handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) -> Reason = emqx_reason_codes:name(ReasonCode), AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()), AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of @@ -643,6 +675,9 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) -> handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) -> {ok, {close, ReasonName}, Channel}; +handle_out(auth, {ReasonCode, Properties}, Channel) -> + {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel}; + handle_out(Type, Data, Channel) -> ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]), {ok, Channel}. @@ -1069,6 +1104,55 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, is_anonymous(#{anonymous := true}) -> true; is_anonymous(_AuthResult) -> false. +%%-------------------------------------------------------------------- +%% Enhanced Authentication + +enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = Protover, + properties = Properties + }), Channel) -> + case Protover of + ?MQTT_PROTO_V5 -> + AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined), + AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined), + do_enhanced_auth(AuthMethod, AuthData, Channel); + _ -> + {ok, #{}, Channel} + end; + +enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> + AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined), + NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined), + AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined), + case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of + true -> + {error, emqx_reason_codes:connack_error(bad_authentication_method), Channel}; + false -> + do_enhanced_auth(AuthMethod, AuthData, Channel) + end. + +do_enhanced_auth(undefined, undefined, Channel) -> + {ok, #{}, Channel}; +do_enhanced_auth(undefined, _AuthData, Channel) -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel}; +do_enhanced_auth(_AuthMethod, undefined, Channel) -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel}; +do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) -> + case do_auth_check(AuthMethod, AuthData, Cache) of + ok -> {ok, #{}, Channel#channel{auth_cache = #{}}}; + {ok, NAuthData} -> + NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, + {ok, NProperties, Channel#channel{auth_cache = #{}}}; + {continue, NAuthData, NCache} -> + NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, + {continue, NProperties, Channel#channel{auth_cache = NCache}}; + {error, _Reason} -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel} + end. + +do_auth_check(_AuthMethod, _AuthData, _AuthDataCache) -> + {error, not_authorized}. + %%-------------------------------------------------------------------- %% Process Topic Alias @@ -1259,14 +1343,12 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Ensure connected -ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> +ensure_connected(Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, - conn_state = connected, - will_msg = emqx_packet:will_msg(ConnPkt), - alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) + conn_state = connected }. %%-------------------------------------------------------------------- diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index c3ee2b87e..ece285767 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -169,6 +169,7 @@ compat(_Other, _Code) -> undefined. frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE; frame_error(_) -> ?RC_MALFORMED_PACKET. +connack_error(protocol_error) -> ?RC_PROTOCOL_ERROR; connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 342ba8bae..a94095b2b 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -112,6 +112,43 @@ t_handle_in_unexpected_connect_packet(_) -> {ok, [{outgoing, Packet}, {close, protocol_error}], Channel} = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel). +t_handle_in_connect_auth_failed(_) -> + ConnPkt = #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V5, + is_bridge = false, + clean_start = true, + keepalive = 30, + properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + clientid = <<"clientid">>, + username = <<"username">> + }, + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = + emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), channel(#{conn_state => idle})). + +t_handle_in_continue_auth(_) -> + Properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + {shutdown, bad_authentication_method, ?CONNACK_PACKET(?RC_BAD_AUTHENTICATION_METHOD), _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel()), + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})). + +t_handle_in_re_auth(_) -> + Properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel()), + {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_NOT_AUTHORIZED)}, {close, not_authorized}], _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})). + t_handle_in_qos0_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Channel = channel(#{conn_state => connected}), @@ -286,7 +323,7 @@ t_process_connect(_) -> {ok, #{session => session(), present => false}} end), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = - emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})). + emqx_channel:process_connect(#{}, channel(#{conn_state => idle})). t_process_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -346,12 +383,12 @@ t_handle_out_publish_nl(_) -> t_handle_out_connack_sucess(_) -> {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = - emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()), + emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()), ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_out_connack_failure(_) -> {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} = - emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()). + emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()). t_handle_out_puback(_) -> Channel = channel(#{conn_state => connected}),