diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 699963644..b1396db3c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -384,7 +384,7 @@ process(?CONNECT_PACKET( PState4 = PState3#protocol{session = Session, connected = true, credentials = keepsafety(Credentials0)}, ok = emqx_cm:register_channel(client_id(PState4)), - true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), + ok = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive start_keepalive(Keepalive, PState4), %% Success @@ -471,7 +471,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), {ok, TopicFilters} -> TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), - {ok, ReasonCodes, NSession} = emqx_session:subscribe(TopicFilters1, Session), + {ok, ReasonCodes, NSession} = emqx_session:subscribe(Credentials, TopicFilters1, Session), handle_out({suback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); {error, TopicFilters} -> {SubTopics, ReasonCodes} = @@ -490,7 +490,7 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters), - {ok, ReasonCodes, NSession} = emqx_session:unsubscribe(TopicFilters1, Session), + {ok, ReasonCodes, NSession} = emqx_session:unsubscribe(Credentials, TopicFilters1, Session), handle_out({unsuback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); process(?PACKET(?PINGREQ), PState) -> @@ -555,16 +555,16 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), puback(?QOS_0, _PacketId, _Result, PState) -> {ok, PState}; -puback(?QOS_1, PacketId, {ok, []}, PState) -> +puback(?QOS_1, PacketId, [], PState) -> handle_out({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); %%TODO: calc the deliver count? -puback(?QOS_1, PacketId, {ok, _Result}, PState) -> +puback(?QOS_1, PacketId, Result, PState) when is_list(Result) -> handle_out({puback, PacketId, ?RC_SUCCESS}, PState); puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> handle_out({puback, PacketId, ReasonCode}, PState); -puback(?QOS_2, PacketId, {ok, []}, PState) -> +puback(?QOS_2, PacketId, [], PState) -> handle_out({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); -puback(?QOS_2, PacketId, {ok, _Result}, PState) -> +puback(?QOS_2, PacketId, Result, PState) when is_list(Result) -> handle_out({pubrec, PacketId, ?RC_SUCCESS}, PState); puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> handle_out({pubrec, PacketId, ReasonCode}, PState). @@ -632,6 +632,20 @@ handle_out({connack, ReasonCode}, PState = #protocol{proto_ver = ProtoVer}) -> Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), {error, Reason, ?CONNACK_PACKET(ReasonCode), PState}; +handle_out({deliver, Topic, Msg}, PState = #protocol{session = Session}) -> + case emqx_session:deliver(Topic, Msg, Session) of + {ok, Publish, NSession} -> + handle_out(Publish, PState#protocol{session = NSession}); + {ok, NSession} -> + {ok, PState#protocol{session = NSession}} + end; + +handle_out({publish, PacketId, Msg}, PState = #protocol{credentials = Credentials}) -> + Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg), + Msg1 = emqx_message:update_expiry(Msg0), + Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1), + {ok, emqx_packet:from_message(PacketId, Msg2), PState}; + handle_out({puback, PacketId, ReasonCode}, PState) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState}; %% TODO: diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index 979dc77e9..e92c55d48 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -66,7 +66,13 @@ t_ws_auth_failure(_Config) -> t_ws_other_type_frame(_Config) -> WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), {ok, _} = rfc6455_client:open(WS), - ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), + Connect = ?CONNECT_PACKET( + #mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">> + }), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(Connect)), {binary, Bin} = rfc6455_client:recv(WS), Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), {ok, Connack, <<>>, _} = raw_recv_pase(Bin),