Use 'peerhost' to replace 'peername'

This commit is contained in:
Feng Lee 2019-09-18 19:17:30 +08:00
parent 4764a7707c
commit 00f3a2f939
6 changed files with 17 additions and 231 deletions

View File

@ -108,9 +108,9 @@ match_who(#{client_id := ClientId}, {client, ClientId}) ->
true;
match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#{peername := undefined}, {ipaddr, _Tup}) ->
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#{peername := {IP, _}}, {ipaddr, CIDR}) ->
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(Client, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->

View File

@ -72,8 +72,7 @@ start_link() ->
-spec(check(emqx_types:client()) -> boolean()).
check(#{client_id := ClientId,
username := Username,
peername := {IPAddr, _}
}) ->
peerhost := IPAddr}) ->
ets:member(?BANNED_TAB, {client_id, ClientId})
orelse ets:member(?BANNED_TAB, {username, Username})
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
@ -82,11 +81,10 @@ check(#{client_id := ClientId,
add(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?BANNED_TAB, Banned).
-spec(delete({client_id, emqx_types:client_id()} |
{username, emqx_types:username()} |
{peername, emqx_types:peername()}) -> ok).
delete(Key) ->
mnesia:dirty_delete(?BANNED_TAB, Key).
-spec(delete({client_id, emqx_types:client_id()}
| {username, emqx_types:username()}
| {peerhost, emqx_types:peerhost()}) -> ok).
delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key).
info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey).

View File

@ -52,7 +52,7 @@
-record(flapping, {
client_id :: emqx_types:client_id(),
peername :: emqx_types:peername(),
peerhost :: emqx_types:peerhost(),
started_at :: pos_integer(),
detect_cnt :: pos_integer(),
banned_at :: pos_integer()
@ -84,7 +84,7 @@ check(ClientId, #{banned_interval := Interval}) ->
-spec(detect(emqx_types:client()) -> boolean()).
detect(Client) -> detect(Client, get_policy()).
detect(#{client_id := ClientId, peername := Peername},
detect(#{client_id := ClientId, peerhost := PeerHost},
Policy = #{threshold := Threshold}) ->
try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
Cnt when Cnt < Threshold -> false;
@ -98,7 +98,7 @@ detect(#{client_id := ClientId, peername := Peername},
error:badarg ->
%% Create a flapping record.
Flapping = #flapping{client_id = ClientId,
peername = Peername,
peerhost = PeerHost,
started_at = emqx_time:now_ms(),
detect_cnt = 1
},
@ -132,7 +132,7 @@ handle_call(Req, _From, State) ->
{reply, ignored, State}.
handle_cast({detected, Flapping = #flapping{client_id = ClientId,
peername = Peername,
peerhost = PeerHost,
started_at = StartedAt,
detect_cnt = DetectCnt},
#{duration := Duration}}, State) ->
@ -140,7 +140,7 @@ handle_cast({detected, Flapping = #flapping{client_id = ClientId,
true -> %% Flapping happened:(
%% Log first
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:format(Peername), DetectCnt, Duration]),
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
%% TODO: Send Alarm
%% Banned.
BannedFlapping = Flapping#flapping{client_id = {banned, ClientId},
@ -149,7 +149,7 @@ handle_cast({detected, Flapping = #flapping{client_id = ClientId,
ets:insert(?FLAPPING_TAB, BannedFlapping);
false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:format(Peername), DetectCnt, Interval]),
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]),
ets:delete_object(?FLAPPING_TAB, Flapping)
end,
{noreply, State};

View File

@ -1,133 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT Protocol
-module(emqx_protocol).
-include("types.hrl").
-include("emqx_mqtt.hrl").
-export([ init/2
, info/1
, info/2
, attrs/1
]).
-export([ find_alias/2
, save_alias/3
, clear_will_msg/1
]).
-export_type([protocol/0]).
-record(protocol, {
%% MQTT Proto Name
proto_name :: binary(),
%% MQTT Proto Version
proto_ver :: emqx_types:ver(),
%% Clean Start Flag
clean_start :: boolean(),
%% MQTT Keepalive interval
keepalive :: non_neg_integer(),
%% ClientId in CONNECT Packet
client_id :: emqx_types:client_id(),
%% Username in CONNECT Packet
username :: emqx_types:username(),
%% MQTT Will Msg
will_msg :: emqx_types:message(),
%% MQTT Topic Aliases
topic_aliases :: maybe(map()),
%% MQTT Topic Alias Maximum
alias_maximum :: maybe(map())
}).
-opaque(protocol() :: #protocol{}).
-define(INFO_KEYS, record_info(fields, protocol)).
-define(ATTR_KEYS, [proto_name, proto_ver, clean_start, keepalive]).
-spec(init(#mqtt_packet_connect{}, atom()) -> protocol()).
init(#mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
clean_start = CleanStart,
keepalive = Keepalive,
properties = Properties,
client_id = ClientId,
username = Username} = ConnPkt, Zone) ->
WillMsg = emqx_packet:will_msg(ConnPkt),
#protocol{proto_name = ProtoName,
proto_ver = ProtoVer,
clean_start = CleanStart,
keepalive = Keepalive,
client_id = ClientId,
username = Username,
will_msg = WillMsg,
alias_maximum = #{outbound => emqx_mqtt_props:get_property('Topic-Alias-Maximum', Properties, 0),
inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone), 0)}
}.
-spec(info(protocol()) -> emqx_types:infos()).
info(Proto) ->
maps:from_list(info(?INFO_KEYS, Proto)).
-spec(info(atom()|list(atom()), protocol()) -> term()).
info(Keys, Proto) when is_list(Keys) ->
[{Key, info(Key, Proto)} || Key <- Keys];
info(proto_name, #protocol{proto_name = ProtoName}) ->
ProtoName;
info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
ProtoVer;
info(clean_start, #protocol{clean_start = CleanStart}) ->
CleanStart;
info(keepalive, #protocol{keepalive = Keepalive}) ->
Keepalive;
info(client_id, #protocol{client_id = ClientId}) ->
ClientId;
info(username, #protocol{username = Username}) ->
Username;
info(will_msg, #protocol{will_msg = WillMsg}) ->
WillMsg;
info(will_delay_interval, #protocol{will_msg = undefined}) ->
0;
info(will_delay_interval, #protocol{will_msg = WillMsg}) ->
emqx_message:get_header('Will-Delay-Interval', WillMsg, 0);
info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
Aliases;
info(alias_maximum, #protocol{alias_maximum = AliasMaximum}) ->
AliasMaximum.
-spec(attrs(protocol()) -> emqx_types:attrs()).
attrs(Proto) ->
maps:from_list(info(?ATTR_KEYS, Proto)).
-spec(find_alias(emqx_types:alias_id(), protocol())
-> {ok, emqx_types:topic()} | false).
find_alias(_AliasId, #protocol{topic_aliases = undefined}) ->
false;
find_alias(AliasId, #protocol{topic_aliases = Aliases}) ->
maps:find(AliasId, Aliases).
-spec(save_alias(emqx_types:alias_id(), emqx_types:topic(), protocol())
-> protocol()).
save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = undefined}) ->
Proto#protocol{topic_aliases = #{AliasId => Topic}};
save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = Aliases}) ->
Proto#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}.
clear_will_msg(Protocol) ->
Protocol#protocol{will_msg = undefined}.

View File

@ -36,6 +36,7 @@
, client_id/0
, username/0
, password/0
, peerhost/0
, peername/0
, protocol/0
]).
@ -103,7 +104,7 @@
}).
-type(client() :: #{zone := zone(),
conn_mod := maybe(module()),
peername := peername(),
peerhost := peerhost(),
sockname := peername(),
client_id := client_id(),
username := username(),
@ -117,9 +118,10 @@
anonymous => boolean(),
atom() => term()
}).
-type(client_id() :: binary() | atom()).
-type(client_id() :: binary()|atom()).
-type(username() :: maybe(binary())).
-type(password() :: maybe(binary())).
-type(peerhost() :: inet:ip_address()).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(auth_result() :: success

View File

@ -1,81 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_protocol_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
[{proto, init_protocol()}|Config].
init_protocol() ->
emqx_protocol:init(#mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V5,
is_bridge = false,
clean_start = true,
keepalive = 30,
properties = #{},
client_id = <<"clientid">>,
username = <<"username">>,
password = <<"passwd">>
}, testing).
end_per_suite(_Config) -> ok.
t_init_info_1(Config) ->
Proto = proplists:get_value(proto, Config),
?assertEqual(#{proto_name => <<"MQTT">>,
proto_ver => ?MQTT_PROTO_V5,
clean_start => true,
keepalive => 30,
will_msg => undefined,
client_id => <<"clientid">>,
username => <<"username">>,
topic_aliases => undefined,
alias_maximum => #{outbound => 0, inbound => 0}
}, emqx_protocol:info(Proto)).
t_init_info_2(Config) ->
Proto = proplists:get_value(proto, Config),
?assertEqual(<<"MQTT">>, emqx_protocol:info(proto_name, Proto)),
?assertEqual(?MQTT_PROTO_V5, emqx_protocol:info(proto_ver, Proto)),
?assertEqual(true, emqx_protocol:info(clean_start, Proto)),
?assertEqual(30, emqx_protocol:info(keepalive, Proto)),
?assertEqual(<<"clientid">>, emqx_protocol:info(client_id, Proto)),
?assertEqual(<<"username">>, emqx_protocol:info(username, Proto)),
?assertEqual(undefined, emqx_protocol:info(will_msg, Proto)),
?assertEqual(0, emqx_protocol:info(will_delay_interval, Proto)),
?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)),
?assertEqual(#{outbound => 0, inbound => 0}, emqx_protocol:info(alias_maximum, Proto)).
t_find_save_alias(Config) ->
Proto = proplists:get_value(proto, Config),
?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)),
?assertEqual(false, emqx_protocol:find_alias(1, Proto)),
Proto1 = emqx_protocol:save_alias(1, <<"t1">>, Proto),
Proto2 = emqx_protocol:save_alias(2, <<"t2">>, Proto1),
?assertEqual(#{1 => <<"t1">>, 2 => <<"t2">>},
emqx_protocol:info(topic_aliases, Proto2)),
?assertEqual({ok, <<"t1">>}, emqx_protocol:find_alias(1, Proto2)),
?assertEqual({ok, <<"t2">>}, emqx_protocol:find_alias(2, Proto2)).