Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
1276afe96a
|
@ -1886,7 +1886,9 @@ plugins.etc_dir = {{ platform_etc_dir }}/plugins/
|
|||
## Value: File
|
||||
plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins
|
||||
|
||||
## File to store loaded plugin names.
|
||||
## The directory of extension plugins.
|
||||
##
|
||||
## Value: File
|
||||
plugins.expand_plugins_dir = {{ platform_plugins_dir }}/
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
|
|
|
@ -997,6 +997,8 @@ end}.
|
|||
end, #{}, string:tokens(Val, ",")),
|
||||
{mqueue_priorities, MqueuePriorities}
|
||||
end;
|
||||
("mountpoint", Val) ->
|
||||
{mountpoint, iolist_to_binary(Val)};
|
||||
(Opt, Val) ->
|
||||
{list_to_atom(Opt), Val}
|
||||
end,
|
||||
|
|
|
@ -287,7 +287,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|||
end;
|
||||
|
||||
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||
Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||
case emqx_packet:check(Packet) of
|
||||
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
||||
TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
||||
|
@ -296,7 +296,15 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|||
TopicFilters2
|
||||
),
|
||||
{ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
||||
handle_out(suback, {PacketId, ReasonCodes}, NChannel);
|
||||
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
||||
lists:any(fun(ReasonCode) ->
|
||||
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
||||
end, ReasonCodes) of
|
||||
true ->
|
||||
handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel);
|
||||
false ->
|
||||
handle_out(suback, {PacketId, ReasonCodes}, NChannel)
|
||||
end;
|
||||
{error, ReasonCode} ->
|
||||
handle_out(disconnect, ReasonCode, Channel)
|
||||
end;
|
||||
|
@ -373,7 +381,8 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|||
%% Process Publish
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
|
||||
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
||||
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
||||
case pipeline([fun process_alias/2,
|
||||
fun check_pub_alias/2,
|
||||
fun check_pub_acl/2,
|
||||
|
@ -382,6 +391,19 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) ->
|
|||
{ok, NPacket, NChannel} ->
|
||||
Msg = packet_to_message(NPacket, NChannel),
|
||||
do_publish(PacketId, Msg, NChannel);
|
||||
{error, ReasonCode, NChannel} when ReasonCode =:= ?RC_NOT_AUTHORIZED ->
|
||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
|
||||
ignore ->
|
||||
case QoS of
|
||||
?QOS_0 -> {ok, NChannel};
|
||||
_ ->
|
||||
handle_out(puback, {PacketId, ReasonCode}, NChannel)
|
||||
end;
|
||||
disconnect ->
|
||||
handle_out(disconnect, ReasonCode, NChannel)
|
||||
end;
|
||||
{error, ReasonCode, NChannel} ->
|
||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
||||
|
@ -786,6 +808,9 @@ handle_info(Info, Channel) ->
|
|||
handle_timeout(_TRef, {keepalive, _StatVal},
|
||||
Channel = #channel{keepalive = undefined}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, {keepalive, _StatVal},
|
||||
Channel = #channel{conn_state = disconnected}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, {keepalive, StatVal},
|
||||
Channel = #channel{keepalive = Keepalive}) ->
|
||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||
|
@ -796,6 +821,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
|||
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
|
||||
end;
|
||||
|
||||
handle_timeout(_TRef, retry_delivery,
|
||||
Channel = #channel{conn_state = disconnected}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, retry_delivery,
|
||||
Channel = #channel{session = Session}) ->
|
||||
case emqx_session:retry(Session) of
|
||||
|
@ -809,6 +837,9 @@ handle_timeout(_TRef, retry_delivery,
|
|||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||
end;
|
||||
|
||||
handle_timeout(_TRef, expire_awaiting_rel,
|
||||
Channel = #channel{conn_state = disconnected}) ->
|
||||
{ok, Channel};
|
||||
handle_timeout(_TRef, expire_awaiting_rel,
|
||||
Channel = #channel{session = Session}) ->
|
||||
case emqx_session:expire(awaiting_rel, Session) of
|
||||
|
@ -981,6 +1012,9 @@ maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Us
|
|||
false -> ok
|
||||
end.
|
||||
|
||||
maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId})
|
||||
when ClientId /= undefined ->
|
||||
{ok, ClientInfo};
|
||||
maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
|
||||
%% Generate a rand clientId
|
||||
{ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
|
||||
|
|
|
@ -476,13 +476,18 @@ handle_timeout(_TRef, emit_stats, State =
|
|||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
{ok, State#state{stats_timer = undefined}};
|
||||
|
||||
handle_timeout(TRef, keepalive, State =
|
||||
#state{transport = Transport, socket = Socket}) ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
|
||||
socket = Socket,
|
||||
channel = Channel})->
|
||||
case emqx_channel:info(conn_state, Channel) of
|
||||
disconnected -> {ok, State};
|
||||
_ ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
handle_timeout(TRef, {keepalive, RecvOct}, State);
|
||||
{error, Reason} ->
|
||||
handle_info({sock_error, Reason}, State)
|
||||
end
|
||||
end;
|
||||
|
||||
handle_timeout(TRef, Msg, State) ->
|
||||
|
@ -617,7 +622,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|||
{ok, Limiter1} ->
|
||||
State#state{limiter = Limiter1};
|
||||
{pause, Time, Limiter1} ->
|
||||
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
|
||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
||||
TRef = start_timer(Time, limit_timeout),
|
||||
State#state{sockstate = blocked,
|
||||
limiter = Limiter1,
|
||||
|
|
|
@ -478,8 +478,9 @@ format_variable(#mqtt_packet_publish{topic_name = TopicName,
|
|||
packet_id = PacketId}) ->
|
||||
io_lib:format("Topic=~s, PacketId=~p", [TopicName, PacketId]);
|
||||
|
||||
format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
|
||||
io_lib:format("PacketId=~p", [PacketId]);
|
||||
format_variable(#mqtt_packet_puback{packet_id = PacketId,
|
||||
reason_code = ReasonCode}) ->
|
||||
io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]);
|
||||
|
||||
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
|
||||
topic_filters = TopicFilters}) ->
|
||||
|
|
|
@ -414,7 +414,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|||
{ok, Limiter1} ->
|
||||
State#state{limiter = Limiter1};
|
||||
{pause, Time, Limiter1} ->
|
||||
?LOG(debug, "Pause ~pms due to rate limit", [Time]),
|
||||
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
||||
TRef = start_timer(Time, limit_timeout),
|
||||
NState = State#state{sockstate = blocked,
|
||||
limiter = Limiter1,
|
||||
|
|
|
@ -45,7 +45,8 @@
|
|||
all() ->
|
||||
[{group, mqttv3},
|
||||
{group, mqttv4},
|
||||
{group, mqttv5}
|
||||
{group, mqttv5},
|
||||
{group, others}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
@ -66,17 +67,26 @@ groups() ->
|
|||
]},
|
||||
{mqttv5, [non_parallel_tests],
|
||||
[t_basic_with_props_v5
|
||||
]},
|
||||
{others, [non_parallel_tests],
|
||||
[t_username_as_clientid,
|
||||
t_certcn_as_clientid
|
||||
]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_ct_helpers:start_apps([], fun set_special_confs/1),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
set_special_confs(emqx) ->
|
||||
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
|
||||
set_special_confs(_) ->
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases for MQTT v3
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -110,10 +120,7 @@ t_cm_registry(_) ->
|
|||
{_, Pid, _, _} = lists:keyfind(registry, 1, Info),
|
||||
ignored = gen_server:call(Pid, <<"Unexpected call">>),
|
||||
gen_server:cast(Pid, <<"Unexpected cast">>),
|
||||
Pid ! <<"Unexpected info">>,
|
||||
ok = application:stop(mnesia),
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_ct_helpers:start_apps([]).
|
||||
Pid ! <<"Unexpected info">>.
|
||||
|
||||
t_will_message(_Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true},
|
||||
|
@ -263,6 +270,23 @@ t_basic(_Opts) ->
|
|||
?assertEqual(3, length(recv_msgs(3))),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
t_username_as_clientid(_) ->
|
||||
emqx_zone:set_env(external, use_username_as_clientid, true),
|
||||
Username = <<"usera">>,
|
||||
{ok, C} = emqtt:start_link([{username, Username}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
#{clientinfo := #{clientid := Username}} = emqx_cm:get_chan_info(Username),
|
||||
emqtt:disconnect(C).
|
||||
|
||||
t_certcn_as_clientid(_) ->
|
||||
CN = <<"0004.novalocal">>,
|
||||
emqx_zone:set_env(external, use_username_as_clientid, true),
|
||||
SslConf = emqx_ct_helpers:client_ssl_twoway(),
|
||||
{ok, C} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),
|
||||
emqtt:disconnect(C).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -74,6 +74,14 @@ receive_disconnect_reasoncode() ->
|
|||
error("no disconnect packet")
|
||||
end.
|
||||
|
||||
waiting_client_process_exit(C) ->
|
||||
receive
|
||||
{'EXIT', C, Reason} -> Reason;
|
||||
_Oth -> error({got_another_message, _Oth})
|
||||
after
|
||||
1000 -> error({waiting_timeout, C})
|
||||
end.
|
||||
|
||||
clean_retained(Topic) ->
|
||||
{ok, Clean} = emqtt:start_link([{clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Clean),
|
||||
|
@ -102,6 +110,7 @@ t_basic_test(_) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
t_connect_clean_start(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
|
||||
|
@ -109,11 +118,19 @@ t_connect_clean_start(_) ->
|
|||
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
|
||||
?assertEqual(142, receive_disconnect_reasoncode()),
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
ok = emqtt:disconnect(Client2),
|
||||
waiting_client_process_exit(Client2),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
|
||||
ok = emqtt:disconnect(Client3).
|
||||
ok = emqtt:disconnect(Client3),
|
||||
waiting_client_process_exit(Client3),
|
||||
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_connect_will_message(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
@ -396,7 +413,7 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack1} = emqtt:connect(Client1),
|
||||
?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9]
|
||||
?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9]
|
||||
|
||||
{ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10]
|
||||
|
@ -404,6 +421,7 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
{ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
|
@ -413,7 +431,8 @@ t_connack_max_qos_allowed(_) ->
|
|||
{will_qos, 2}
|
||||
]),
|
||||
{error, Connack2} = emqtt:connect(Client2),
|
||||
?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12]
|
||||
?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12]
|
||||
waiting_client_process_exit(Client2),
|
||||
|
||||
%% max_qos_allowed = 1
|
||||
emqx_zone:set_env(external, max_qos_allowed, 1),
|
||||
|
@ -422,7 +441,7 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack3} = emqtt:connect(Client3),
|
||||
?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9]
|
||||
?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9]
|
||||
|
||||
{ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10]
|
||||
|
@ -430,6 +449,7 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
{ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
|
||||
waiting_client_process_exit(Client3),
|
||||
|
||||
{ok, Client4} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
|
@ -439,11 +459,8 @@ t_connack_max_qos_allowed(_) ->
|
|||
{will_qos, 2}
|
||||
]),
|
||||
{error, Connack4} = emqtt:connect(Client4),
|
||||
?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12]
|
||||
receive
|
||||
{'EXIT', _, {shutdown,qos_not_supported}} -> ok
|
||||
after 100 -> error("t_connack_max_qos_allowed")
|
||||
end,
|
||||
?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12]
|
||||
waiting_client_process_exit(Client4),
|
||||
|
||||
%% max_qos_allowed = 2
|
||||
emqx_zone:set_env(external, max_qos_allowed, 2),
|
||||
|
@ -452,12 +469,10 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack5} = emqtt:connect(Client5),
|
||||
?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9]
|
||||
?assertEqual(2, maps:get('Maximum-QoS', Connack5)), %% [MQTT-3.2.2-9]
|
||||
ok = emqtt:disconnect(Client5),
|
||||
waiting_client_process_exit(Client5),
|
||||
|
||||
receive {'EXIT', _, _} -> ok
|
||||
after 100 -> ok
|
||||
end,
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_connack_assigned_clienid(_) ->
|
||||
|
@ -499,10 +514,8 @@ t_publish_wildtopic(_) ->
|
|||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, <<"error topic">>),
|
||||
?assertEqual(144, receive_disconnect_reasoncode()),
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
receive {'EXIT', _, _} -> ok
|
||||
after 100 -> ok
|
||||
end,
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_payload_format_indicator(_) ->
|
||||
|
@ -525,6 +538,7 @@ t_publish_topic_alias(_) ->
|
|||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8]
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
@ -533,10 +547,8 @@ t_publish_topic_alias(_) ->
|
|||
ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12]
|
||||
ok = emqtt:disconnect(Client2),
|
||||
waiting_client_process_exit(Client2),
|
||||
|
||||
receive {'EXIT', _, _} -> ok
|
||||
after 100 -> ok
|
||||
end,
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_response_topic(_) ->
|
||||
|
@ -547,10 +559,8 @@ t_publish_response_topic(_) ->
|
|||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
receive {'EXIT', _, _} -> ok
|
||||
after 100 -> ok
|
||||
end,
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_properties(_) ->
|
||||
|
|
Loading…
Reference in New Issue