Fix the function_clause error when publishing QoS2 message

This commit is contained in:
Feng Lee 2019-06-25 17:35:58 +08:00
parent 67b7266438
commit 9ee0a4d171
2 changed files with 28 additions and 8 deletions

View File

@ -384,7 +384,7 @@ process(?CONNECT_PACKET(
PState4 = PState3#protocol{session = Session, connected = true, PState4 = PState3#protocol{session = Session, connected = true,
credentials = keepsafety(Credentials0)}, credentials = keepsafety(Credentials0)},
ok = emqx_cm:register_channel(client_id(PState4)), ok = emqx_cm:register_channel(client_id(PState4)),
true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), ok = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
%% Start keepalive %% Start keepalive
start_keepalive(Keepalive, PState4), start_keepalive(Keepalive, PState4),
%% Success %% Success
@ -471,7 +471,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
{ok, TopicFilters} -> {ok, TopicFilters} ->
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0),
{ok, ReasonCodes, NSession} = emqx_session:subscribe(TopicFilters1, Session), {ok, ReasonCodes, NSession} = emqx_session:subscribe(Credentials, TopicFilters1, Session),
handle_out({suback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); handle_out({suback, PacketId, ReasonCodes}, PState#protocol{session = NSession});
{error, TopicFilters} -> {error, TopicFilters} ->
{SubTopics, ReasonCodes} = {SubTopics, ReasonCodes} =
@ -490,7 +490,7 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters), TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters),
{ok, ReasonCodes, NSession} = emqx_session:unsubscribe(TopicFilters1, Session), {ok, ReasonCodes, NSession} = emqx_session:unsubscribe(Credentials, TopicFilters1, Session),
handle_out({unsuback, PacketId, ReasonCodes}, PState#protocol{session = NSession}); handle_out({unsuback, PacketId, ReasonCodes}, PState#protocol{session = NSession});
process(?PACKET(?PINGREQ), PState) -> process(?PACKET(?PINGREQ), PState) ->
@ -555,16 +555,16 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
puback(?QOS_0, _PacketId, _Result, PState) -> puback(?QOS_0, _PacketId, _Result, PState) ->
{ok, PState}; {ok, PState};
puback(?QOS_1, PacketId, {ok, []}, PState) -> puback(?QOS_1, PacketId, [], PState) ->
handle_out({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); handle_out({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
%%TODO: calc the deliver count? %%TODO: calc the deliver count?
puback(?QOS_1, PacketId, {ok, _Result}, PState) -> puback(?QOS_1, PacketId, Result, PState) when is_list(Result) ->
handle_out({puback, PacketId, ?RC_SUCCESS}, PState); handle_out({puback, PacketId, ?RC_SUCCESS}, PState);
puback(?QOS_1, PacketId, {error, ReasonCode}, PState) -> puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
handle_out({puback, PacketId, ReasonCode}, PState); handle_out({puback, PacketId, ReasonCode}, PState);
puback(?QOS_2, PacketId, {ok, []}, PState) -> puback(?QOS_2, PacketId, [], PState) ->
handle_out({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState); handle_out({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
puback(?QOS_2, PacketId, {ok, _Result}, PState) -> puback(?QOS_2, PacketId, Result, PState) when is_list(Result) ->
handle_out({pubrec, PacketId, ?RC_SUCCESS}, PState); handle_out({pubrec, PacketId, ?RC_SUCCESS}, PState);
puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
handle_out({pubrec, PacketId, ReasonCode}, PState). handle_out({pubrec, PacketId, ReasonCode}, PState).
@ -632,6 +632,20 @@ handle_out({connack, ReasonCode}, PState = #protocol{proto_ver = ProtoVer}) ->
Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
{error, Reason, ?CONNACK_PACKET(ReasonCode), PState}; {error, Reason, ?CONNACK_PACKET(ReasonCode), PState};
handle_out({deliver, Topic, Msg}, PState = #protocol{session = Session}) ->
case emqx_session:deliver(Topic, Msg, Session) of
{ok, Publish, NSession} ->
handle_out(Publish, PState#protocol{session = NSession});
{ok, NSession} ->
{ok, PState#protocol{session = NSession}}
end;
handle_out({publish, PacketId, Msg}, PState = #protocol{credentials = Credentials}) ->
Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg),
Msg1 = emqx_message:update_expiry(Msg0),
Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1),
{ok, emqx_packet:from_message(PacketId, Msg2), PState};
handle_out({puback, PacketId, ReasonCode}, PState) -> handle_out({puback, PacketId, ReasonCode}, PState) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState}; {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState};
%% TODO: %% TODO:

View File

@ -66,7 +66,13 @@ t_ws_auth_failure(_Config) ->
t_ws_other_type_frame(_Config) -> t_ws_other_type_frame(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS), {ok, _} = rfc6455_client:open(WS),
ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), Connect = ?CONNECT_PACKET(
#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"admin">>,
password = <<"public">>
}),
ok = rfc6455_client:send_binary(WS, raw_send_serialize(Connect)),
{binary, Bin} = rfc6455_client:recv(WS), {binary, Bin} = rfc6455_client:recv(WS),
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
{ok, Connack, <<>>, _} = raw_recv_pase(Bin), {ok, Connack, <<>>, _} = raw_recv_pase(Bin),