diff --git a/etc/emqx.conf b/etc/emqx.conf index bedf246ee..4703f5083 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -427,7 +427,7 @@ allow_anonymous = true ## TODO: Allow or deny if no ACL rules match. ## ## Value: allow | deny -acl_nomatch = deny +acl_nomatch = allow ## Default ACL File. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index c6fa7e30b..a0d2bc0e2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -653,12 +653,10 @@ end}. ]}. {mapping, "zone.$name.allow_anonymous", "emqx.zones", [ - {default, false}, {datatype, {enum, [true, false]}} ]}. {mapping, "zone.$name.acl_nomatch", "emqx.zones", [ - {default, deny}, {datatype, {enum, [allow, deny]}} ]}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5ebd54dd0..d0d0bc90b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -121,8 +121,9 @@ send_fun(Transport, Socket, Peername) -> fun(Data) -> try Transport:async_send(Socket, Data) of ok -> - ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}), - emqx_metrics:inc('bytes/sent', iolist_size(Data)), ok; + ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), + emqx_metrics:inc('bytes/sent', iolist_size(Data)), + ok; Error -> Error catch error:Error -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 184e03673..d9ad79e0a 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -46,7 +46,6 @@ mqtt_retain_available]). -define(SUBCAP_KEYS, [max_qos_allowed, max_topic_levels, - mqtt_retain_available, mqtt_shared_subscription, mqtt_wildcard_subscription]). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 67d1bffff..c8a751526 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -94,7 +94,11 @@ to_message(ClientId, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLI properties = Props}, payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Props}; + Msg#message{flags = #{dup => Dup, retain => Retain}, + headers = if + Props =:= undefined -> #{}; + true -> Props + end}; to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) -> undefined; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 077c7a00c..96925279f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -215,28 +215,28 @@ process(?CONNECT_PACKET( connack( case check_connect(Connect, PState1) of - ok -> - case authenticate(client(PState1), Password) of + {ok, PState2} -> + case authenticate(client(PState2), Password) of {ok, IsSuper} -> %% Maybe assign a clientId - PState2 = maybe_assign_client_id(PState1#pstate{is_super = IsSuper}), + PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), %% Open session - case try_open_session(PState2) of + case try_open_session(PState3) of {ok, SPid, SP} -> - PState3 = PState2#pstate{session = SPid}, - ok = emqx_cm:register_client({client_id(PState3), self()}, info(PState3)), + PState4 = PState3#pstate{session = SPid}, + ok = emqx_cm:register_client({client_id(PState4), self()}, info(PState4)), %% Start keepalive - start_keepalive(Keepalive, PState3), + start_keepalive(Keepalive, PState4), %% TODO: 'Run hooks' before open_session? - emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState3)), + emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)), %% Success - {?RC_SUCCESS, SP, replvar(PState3)}; + {?RC_SUCCESS, SP, replvar(PState4)}; {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error], PState1), {?RC_UNSPECIFIED_ERROR, PState1} end; {error, Reason} -> - ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState1), + ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2), {?RC_NOT_AUTHORIZED, PState1} end; {error, ReasonCode} -> @@ -245,8 +245,8 @@ process(?CONNECT_PACKET( process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> case check_publish(Packet, PState) of - ok -> - do_publish(Packet, PState); + {ok, PState1} -> + do_publish(Packet, PState1); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState), {ok, PState} @@ -254,16 +254,16 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> case check_publish(Packet, PState) of - ok -> - do_publish(Packet, PState); + {ok, PState1} -> + do_publish(Packet, PState1); {error, ReasonCode} -> deliver({puback, PacketId, ReasonCode}, PState) end; process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) -> case check_publish(Packet, PState) of - ok -> - do_publish(Packet, PState); + {ok, PState1} -> + do_publish(Packet, PState1); {error, ReasonCode} -> deliver({pubrec, PacketId, ReasonCode}, PState) end; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 0c994d947..44c8f183f 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -310,7 +310,7 @@ close(SPid) -> init(#{zone := Zone, client_id := ClientId, - conn_pid := ClientPid, + client_pid := ClientPid, clean_start := CleanStart, username := Username, %% TODO: @@ -469,7 +469,7 @@ handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight} end; %% PUBREL: -handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> +handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) -> {noreply, case maps:take(PacketId, AwaitingRel) of {Msg, AwaitingRel1} -> @@ -503,7 +503,7 @@ handle_cast({resume, ClientPid}, await_rel_timer = AwaitTimer, expiry_timer = ExpireTimer}) -> - ?LOG(info, "Resumed by ~p", [ClientPid], State), + ?LOG(info, "Resumed by ~p ", [ClientPid], State), %% Cancel Timers lists:foreach(fun emqx_misc:cancel_timer/1, @@ -649,6 +649,7 @@ retry_delivery(Force, State = #state{inflight = Inflight}) -> State; false -> Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), + io:format("!!! Retry Delivery: ~p~n", [Msgs]), retry_delivery(Force, Msgs, os:timestamp(), State) end. diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 43e36c060..6b75256d0 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -118,8 +118,8 @@ add_path({Node, Word, Child}) -> %% @private %% @doc Match node with word or '+'. -match_node(root, [<<"$SYS">>|Words]) -> - match_node(<<"$SYS">>, Words, []); +match_node(root, [NodeId = <<$$, _/binary>>|Words]) -> + match_node(NodeId, Words, []); match_node(NodeId, Words) -> match_node(NodeId, Words, []).