Merge pull request #3295 from emqx/master

Auto-pull-request-by-2020-03-06
This commit is contained in:
JianBo He 2020-03-06 11:04:59 +08:00 committed by GitHub
commit 3256f51444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 121 additions and 43 deletions

View File

@ -1886,7 +1886,9 @@ plugins.etc_dir = {{ platform_etc_dir }}/plugins/
## Value: File
plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins
## File to store loaded plugin names.
## The directory of extension plugins.
##
## Value: File
plugins.expand_plugins_dir = {{ platform_plugins_dir }}/
##--------------------------------------------------------------------

View File

@ -997,6 +997,8 @@ end}.
end, #{}, string:tokens(Val, ",")),
{mqueue_priorities, MqueuePriorities}
end;
("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,

View File

@ -287,7 +287,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo}) ->
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_packet:check(Packet) of
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
@ -296,7 +296,15 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
TopicFilters2
),
{ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
handle_out(suback, {PacketId, ReasonCodes}, NChannel);
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
lists:any(fun(ReasonCode) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
end, ReasonCodes) of
true ->
handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel);
false ->
handle_out(suback, {PacketId, ReasonCodes}, NChannel)
end;
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
@ -373,7 +381,8 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
%% Process Publish
%%--------------------------------------------------------------------
process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
Channel = #channel{clientinfo = #{zone := Zone}}) ->
case pipeline([fun process_alias/2,
fun check_pub_alias/2,
fun check_pub_acl/2,
@ -382,6 +391,19 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
{ok, NPacket, NChannel} ->
Msg = packet_to_message(NPacket, NChannel),
do_publish(PacketId, Msg, NChannel);
{error, ReasonCode, NChannel} when ReasonCode =:= ?RC_NOT_AUTHORIZED ->
?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
ignore ->
case QoS of
?QOS_0 -> {ok, NChannel};
_ ->
handle_out(puback, {PacketId, ReasonCode}, NChannel)
end;
disconnect ->
handle_out(disconnect, ReasonCode, NChannel)
end;
{error, ReasonCode, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -786,6 +808,9 @@ handle_info(Info, Channel) ->
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{keepalive = undefined}) ->
{ok, Channel};
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, {keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}) ->
case emqx_keepalive:check(StatVal, Keepalive) of
@ -796,6 +821,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
end;
handle_timeout(_TRef, retry_delivery,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, retry_delivery,
Channel = #channel{session = Session}) ->
case emqx_session:retry(Session) of
@ -809,6 +837,9 @@ handle_timeout(_TRef, retry_delivery,
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
end;
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
@ -981,6 +1012,9 @@ maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Us
false -> ok
end.
maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId})
when ClientId /= undefined ->
{ok, ClientInfo};
maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
%% Generate a rand clientId
{ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};

View File

@ -476,13 +476,18 @@ handle_timeout(_TRef, emit_stats, State =
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
handle_timeout(TRef, keepalive, State =
#state{transport = Transport, socket = Socket}) ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
handle_info({sock_error, Reason}, State)
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
socket = Socket,
channel = Channel})->
case emqx_channel:info(conn_state, Channel) of
disconnected -> {ok, State};
_ ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end
end;
handle_timeout(TRef, Msg, State) ->
@ -617,7 +622,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,

View File

@ -478,8 +478,9 @@ format_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) ->
io_lib:format("Topic=~s, PacketId=~p", [TopicName, PacketId]);
format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
io_lib:format("PacketId=~p", [PacketId]);
format_variable(#mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}) ->
io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]);
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}) ->

View File

@ -414,7 +414,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
NState = State#state{sockstate = blocked,
limiter = Limiter1,

View File

@ -45,7 +45,8 @@
all() ->
[{group, mqttv3},
{group, mqttv4},
{group, mqttv5}
{group, mqttv5},
{group, others}
].
groups() ->
@ -66,17 +67,26 @@ groups() ->
]},
{mqttv5, [non_parallel_tests],
[t_basic_with_props_v5
]},
{others, [non_parallel_tests],
[t_username_as_clientid,
t_certcn_as_clientid
]}
].
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
emqx_ct_helpers:start_apps([], fun set_special_confs/1),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
set_special_confs(emqx) ->
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
set_special_confs(_) ->
ok.
%%--------------------------------------------------------------------
%% Test cases for MQTT v3
%%--------------------------------------------------------------------
@ -110,10 +120,7 @@ t_cm_registry(_) ->
{_, Pid, _, _} = lists:keyfind(registry, 1, Info),
ignored = gen_server:call(Pid, <<"Unexpected call">>),
gen_server:cast(Pid, <<"Unexpected cast">>),
Pid ! <<"Unexpected info">>,
ok = application:stop(mnesia),
emqx_ct_helpers:stop_apps([]),
emqx_ct_helpers:start_apps([]).
Pid ! <<"Unexpected info">>.
t_will_message(_Config) ->
{ok, C1} = emqtt:start_link([{clean_start, true},
@ -263,6 +270,23 @@ t_basic(_Opts) ->
?assertEqual(3, length(recv_msgs(3))),
ok = emqtt:disconnect(C).
t_username_as_clientid(_) ->
emqx_zone:set_env(external, use_username_as_clientid, true),
Username = <<"usera">>,
{ok, C} = emqtt:start_link([{username, Username}]),
{ok, _} = emqtt:connect(C),
#{clientinfo := #{clientid := Username}} = emqx_cm:get_chan_info(Username),
emqtt:disconnect(C).
t_certcn_as_clientid(_) ->
CN = <<"0004.novalocal">>,
emqx_zone:set_env(external, use_username_as_clientid, true),
SslConf = emqx_ct_helpers:client_ssl_twoway(),
{ok, C} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
{ok, _} = emqtt:connect(C),
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),
emqtt:disconnect(C).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -74,6 +74,14 @@ receive_disconnect_reasoncode() ->
error("no disconnect packet")
end.
waiting_client_process_exit(C) ->
receive
{'EXIT', C, Reason} -> Reason;
_Oth -> error({got_another_message, _Oth})
after
1000 -> error({waiting_timeout, C})
end.
clean_retained(Topic) ->
{ok, Clean} = emqtt:start_link([{clean_start, true}]),
{ok, _} = emqtt:connect(Clean),
@ -102,6 +110,7 @@ t_basic_test(_) ->
%%--------------------------------------------------------------------
t_connect_clean_start(_) ->
process_flag(trap_exit, true),
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]),
{ok, _} = emqtt:connect(Client1),
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
@ -109,11 +118,19 @@ t_connect_clean_start(_) ->
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]),
{ok, _} = emqtt:connect(Client2),
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
?assertEqual(142, receive_disconnect_reasoncode()),
waiting_client_process_exit(Client1),
ok = emqtt:disconnect(Client2),
waiting_client_process_exit(Client2),
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]),
{ok, _} = emqtt:connect(Client3),
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
ok = emqtt:disconnect(Client3).
ok = emqtt:disconnect(Client3),
waiting_client_process_exit(Client3),
process_flag(trap_exit, false).
t_connect_will_message(_) ->
Topic = nth(1, ?TOPICS),
@ -396,7 +413,7 @@ t_connack_max_qos_allowed(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack1} = emqtt:connect(Client1),
?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9]
?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9]
{ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10]
{ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10]
@ -404,6 +421,7 @@ t_connack_max_qos_allowed(_) ->
{ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1),
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
waiting_client_process_exit(Client1),
{ok, Client2} = emqtt:start_link([
{proto_ver, v5},
@ -413,7 +431,8 @@ t_connack_max_qos_allowed(_) ->
{will_qos, 2}
]),
{error, Connack2} = emqtt:connect(Client2),
?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12]
?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12]
waiting_client_process_exit(Client2),
%% max_qos_allowed = 1
emqx_zone:set_env(external, max_qos_allowed, 1),
@ -422,7 +441,7 @@ t_connack_max_qos_allowed(_) ->
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack3} = emqtt:connect(Client3),
?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9]
?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9]
{ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10]
{ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10]
@ -430,6 +449,7 @@ t_connack_max_qos_allowed(_) ->
{ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2),
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
waiting_client_process_exit(Client3),
{ok, Client4} = emqtt:start_link([
{proto_ver, v5},
@ -439,11 +459,8 @@ t_connack_max_qos_allowed(_) ->
{will_qos, 2}
]),
{error, Connack4} = emqtt:connect(Client4),
?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12]
receive
{'EXIT', _, {shutdown,qos_not_supported}} -> ok
after 100 -> error("t_connack_max_qos_allowed")
end,
?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12]
waiting_client_process_exit(Client4),
%% max_qos_allowed = 2
emqx_zone:set_env(external, max_qos_allowed, 2),
@ -452,12 +469,10 @@ t_connack_max_qos_allowed(_) ->
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack5} = emqtt:connect(Client5),
?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9]
?assertEqual(2, maps:get('Maximum-QoS', Connack5)), %% [MQTT-3.2.2-9]
ok = emqtt:disconnect(Client5),
waiting_client_process_exit(Client5),
receive {'EXIT', _, _} -> ok
after 100 -> ok
end,
process_flag(trap_exit, false).
t_connack_assigned_clienid(_) ->
@ -499,10 +514,8 @@ t_publish_wildtopic(_) ->
{ok, _} = emqtt:connect(Client1),
ok = emqtt:publish(Client1, Topic, <<"error topic">>),
?assertEqual(144, receive_disconnect_reasoncode()),
waiting_client_process_exit(Client1),
receive {'EXIT', _, _} -> ok
after 100 -> ok
end,
process_flag(trap_exit, false).
t_publish_payload_format_indicator(_) ->
@ -525,6 +538,7 @@ t_publish_topic_alias(_) ->
{ok, _} = emqtt:connect(Client1),
ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8]
waiting_client_process_exit(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client2),
@ -533,10 +547,8 @@ t_publish_topic_alias(_) ->
ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12]
ok = emqtt:disconnect(Client2),
waiting_client_process_exit(Client2),
receive {'EXIT', _, _} -> ok
after 100 -> ok
end,
process_flag(trap_exit, false).
t_publish_response_topic(_) ->
@ -547,10 +559,8 @@ t_publish_response_topic(_) ->
{ok, _} = emqtt:connect(Client1),
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]),
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
waiting_client_process_exit(Client1),
receive {'EXIT', _, _} -> ok
after 100 -> ok
end,
process_flag(trap_exit, false).
t_publish_properties(_) ->