Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-07-11 10:17:13 +08:00
commit 47d628f9d9
7 changed files with 43 additions and 59 deletions

View File

@ -209,7 +209,7 @@ idle(enter, _, State) ->
keep_state_and_data;
idle(timeout, _Timeout, State) ->
{stop, idle_timeout, State};
{stop, {shutdown, idle_timeout}, State};
idle(cast, {incoming, Packet}, State) ->
handle_incoming(Packet, fun(NState) ->

View File

@ -891,11 +891,11 @@ check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}})
check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
lists:foldr(
fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl ->
AllowTerm = {Ok, [{Topic, SubOpts}|Acc]},
fun({Topic, SubOpts}, {ok, Acc}) when EnableAcl ->
AllowTerm = {ok, [{Topic, SubOpts}|Acc]},
DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]},
do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm);
(TopicFilter, Acc) ->
(TopicFilter, {ok, Acc}) ->
{ok, [TopicFilter | Acc]}
end, {ok, []}, TopicFilters).

View File

@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) ->
-spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts);
(RawTopic) when is_binary(RawTopic) ->
emqx_topic:parse(RawTopic)
TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts);
(RawTopic) when is_binary(RawTopic) ->
emqx_topic:parse(RawTopic)
end, RawTopicFilters),
unsubscribe(SPid, undefined, #{}, TopicFilters).

View File

@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive,
case {ProtoState, Shutdown} of
{undefined, _} -> ok;
{_, {shutdown, Reason}} ->
emqx_protocol:terminate(Reason, ProtoState);
emqx_protocol:terminate(Reason, ProtoState),
exit(Reason);
{_, Error} ->
emqx_protocol:terminate(Error, ProtoState)
?LOG(error, "Unexpected terminated for ~p", [Error]),
emqx_protocol:terminate(Error, ProtoState),
exit(unknown)
end.
%%--------------------------------------------------------------------
@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
shutdown(Error, State#state{proto_state = NProtoState})
end.
ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined,
idle_timeout = IdleTimeout}) ->
@ -345,7 +346,7 @@ ensure_stats_timer(State) ->
shutdown(Reason, State) ->
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
self() ! {stop, Reason},
self() ! {stop, State#state{shutdown = {shutdown, Reason}}},
{ok, State}.
wsock_stats() ->

View File

@ -1,45 +0,0 @@
[{emqx,
[{plugins_loaded_file,"loaded_plugins"},
{plugins_etc_dir,"plugins/"},
{broker_sys_interval,60},
{cache_acl,true},
{allow_anonymous,true},
{license_file,"../../etc/emqx.lic"},
{protocol,[{max_clientid_len,1024},{max_packet_size,65536}]},
{client,
[{max_publish_rate,5},{idle_timeout,30000},{enable_stats,60000}]},
{session,
[{max_subscriptions,0},
{upgrade_qos,false},
{max_inflight,32},
{retry_interval,20000},
{max_awaiting_rel,100},
{await_rel_timeout,20000},
{enable_stats,60000},
{expiry_interval,7200000}]},
{mqueue,
[{priority,[]},
{type,simple},
{max_length,infinity},
{low_watermark,0.2},
{high_watermark,0.6},
{store_qos0,true}]},
{pubsub,[{pool_size,8},{by_clientid,true},{async,true}]},
{bridge,[{max_queue_len,10000},{ping_down_interval,1}]},
{listeners, []},
{sysmon,
[{long_gc,false},
{long_schedule,240},
{large_heap,8388608},
{busy_port,false},
{busy_dist_port,true}]}]},
{gen_rpc,
[{socket_keepalive_count,2},
{socket_keepalive_interval,5},
{socket_keepalive_idle,5},
{call_receive_timeout,15000},
{authentication_timeout,5000},
{send_timeout,5000},
{connect_timeout,5000},
{tcp_client_port,5369},
{tcp_server_port,7369}]}].

View File

@ -28,10 +28,23 @@
username = <<"admin">>,
password = <<"public">>})).
-define(WILL_TOPIC, <<"test/websocket/will">>).
-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"admin">>,
password = <<"public">>,
will_flag = true,
will_qos = ?QOS_1,
will_topic = ?WILL_TOPIC,
will_payload = <<"payload">>
})).
all() ->
[ t_ws_connect_api
, t_ws_auth_failure
, t_ws_other_type_frame
, t_ws_will
].
init_per_suite(Config) ->
@ -41,6 +54,22 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_ws_will(_Config) ->
{ok, ClientPid} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(ClientPid),
{ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1),
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS),
Packet = raw_send_serialize(?WILL_CLIENT),
ok = rfc6455_client:send_binary(WS, Packet),
{binary, Bin} = rfc6455_client:recv(WS),
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
exit(WS, abnomal),
?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))),
ok = emqx_client:disconnect(ClientPid),
ok.
t_ws_auth_failure(_Config) ->
application:set_env(emqx, allow_anonymous, false),
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),

View File

@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) ->
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
State#state{phase = closing}.
loop(State = #state{socket = Socket, ppid = PPid, data = Data,
phase = Phase}) ->
receive