fix qos_2 message flow
This commit is contained in:
parent
31c687f26c
commit
2c8bb114f9
|
@ -43,7 +43,8 @@
|
||||||
will_msg,
|
will_msg,
|
||||||
keep_alive,
|
keep_alive,
|
||||||
awaiting_ack,
|
awaiting_ack,
|
||||||
subtopics}).
|
subtopics,
|
||||||
|
awaiting_rel}).
|
||||||
|
|
||||||
|
|
||||||
-define(FRAME_TYPE(Frame, Type),
|
-define(FRAME_TYPE(Frame, Type),
|
||||||
|
@ -86,7 +87,8 @@ handle_call({go, Sock}, _From, _State) ->
|
||||||
parse_state = emqtt_frame:initial_state(),
|
parse_state = emqtt_frame:initial_state(),
|
||||||
message_id = 1,
|
message_id = 1,
|
||||||
subtopics = [],
|
subtopics = [],
|
||||||
awaiting_ack = gb_trees:empty()})}.
|
awaiting_ack = gb_trees:empty(),
|
||||||
|
awaiting_rel = gb_trees:empty()})}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
{stop, {badmsg, Msg}, State}.
|
{stop, {badmsg, Msg}, State}.
|
||||||
|
@ -96,9 +98,17 @@ handle_info({route, Msg}, #state{socket = Sock} = State) ->
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
dup = Dup,
|
dup = Dup,
|
||||||
%message_id = MessageId,
|
message_id = MessageId,
|
||||||
payload = Payload } = Msg,
|
payload = Payload } = Msg,
|
||||||
|
|
||||||
|
SendMsgId =
|
||||||
|
if
|
||||||
|
Qos > ?QOS_0 -> MessageId;
|
||||||
|
true -> 0
|
||||||
|
end,
|
||||||
|
|
||||||
|
|
||||||
|
%TODO: FIXME LATER
|
||||||
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
|
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
|
||||||
type = ?PUBLISH,
|
type = ?PUBLISH,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
|
@ -106,7 +116,7 @@ handle_info({route, Msg}, #state{socket = Sock} = State) ->
|
||||||
dup = Dup },
|
dup = Dup },
|
||||||
variable = #mqtt_frame_publish{
|
variable = #mqtt_frame_publish{
|
||||||
topic_name = Topic,
|
topic_name = Topic,
|
||||||
message_id = 1},
|
message_id = SendMsgId},
|
||||||
payload = Payload },
|
payload = Payload },
|
||||||
|
|
||||||
send_frame(Sock, Frame),
|
send_frame(Sock, Frame),
|
||||||
|
@ -192,11 +202,16 @@ process_received_bytes(Bytes,
|
||||||
stop({shutdown, Error}, State)
|
stop({shutdown, Error}, State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
|
||||||
State=#state{keep_alive=KeepAlive}) ->
|
State=#state{keep_alive=KeepAlive}) ->
|
||||||
?INFO("~p", [Frame]),
|
|
||||||
emqtt_keep_alive:activate(KeepAlive),
|
emqtt_keep_alive:activate(KeepAlive),
|
||||||
process_request(Type, Frame, State).
|
case validate_frame(Type, Frame) of
|
||||||
|
ok ->
|
||||||
|
handle_retained(Type, Frame),
|
||||||
|
process_request(Type, Frame, State);
|
||||||
|
{error, Reason} ->
|
||||||
|
{err, Reason, State}
|
||||||
|
end.
|
||||||
|
|
||||||
process_request(?CONNECT,
|
process_request(?CONNECT,
|
||||||
#mqtt_frame{ variable = #mqtt_frame_connect{
|
#mqtt_frame{ variable = #mqtt_frame_connect{
|
||||||
|
@ -232,43 +247,43 @@ process_request(?CONNECT,
|
||||||
return_code = ReturnCode }}),
|
return_code = ReturnCode }}),
|
||||||
{ok, State1};
|
{ok, State1};
|
||||||
|
|
||||||
process_request(?PUBACK,
|
process_request(?PUBLISH, Frame=#mqtt_frame{
|
||||||
#mqtt_frame{
|
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
||||||
variable = #mqtt_frame_publish{ message_id = MessageId }},
|
emqtt_router:publish(make_msg(Frame)),
|
||||||
#state{awaiting_ack = Awaiting } = State) ->
|
{ok, State};
|
||||||
{ok, State #state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
|
|
||||||
|
|
||||||
process_request(?PUBLISH,
|
process_request(?PUBLISH,
|
||||||
#mqtt_frame{
|
Frame=#mqtt_frame{
|
||||||
fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, State) ->
|
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
||||||
{err, qos2_not_supported, State};
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
emqtt_router:publish(make_msg(Frame)),
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
process_request(?PUBLISH,
|
process_request(?PUBLISH,
|
||||||
|
Frame=#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
||||||
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
emqtt_router:publish(make_msg(Frame)),
|
||||||
|
put({msg, MsgId}, pubrec),
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBREL,
|
||||||
#mqtt_frame{
|
#mqtt_frame{
|
||||||
fixed = #mqtt_frame_fixed{ qos = Qos,
|
fixed = #mqtt_frame_fixed{ qos = ?QOS_1 },
|
||||||
retain = Retain,
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
dup = Dup },
|
State=#state{socket=Sock}) ->
|
||||||
variable = #mqtt_frame_publish{ topic_name = Topic,
|
erase({msg, MsgId}),
|
||||||
message_id = MessageId },
|
|
||||||
payload = Payload }, #state{ socket=Sock, message_id = MsgId } = State) ->
|
|
||||||
Msg = #mqtt_msg{ retain = Retain,
|
|
||||||
qos = Qos,
|
|
||||||
topic = Topic,
|
|
||||||
dup = Dup,
|
|
||||||
message_id = MessageId,
|
|
||||||
payload = Payload },
|
|
||||||
case emqtt_topic:validate({publish, Topic}) of
|
|
||||||
true ->
|
|
||||||
emqtt_router:publish(Topic, Msg),
|
|
||||||
%Retained?
|
|
||||||
retained(Retain, Topic, Msg);
|
|
||||||
false ->
|
|
||||||
?ERROR("badtopic: ~p", [Topic])
|
|
||||||
end,
|
|
||||||
send_frame(Sock,
|
send_frame(Sock,
|
||||||
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
process_request(?SUBSCRIBE,
|
process_request(?SUBSCRIBE,
|
||||||
#mqtt_frame{
|
#mqtt_frame{
|
||||||
|
@ -277,43 +292,35 @@ process_request(?SUBSCRIBE,
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
#state{socket=Sock} = State) ->
|
#state{socket=Sock} = State) ->
|
||||||
|
|
||||||
Topics1 = [Topic#mqtt_topic{qos=supported_subs_qos(Qos)}
|
|
||||||
|| Topic = #mqtt_topic{name=Name, qos=Qos} <- Topics,
|
|
||||||
emqtt_topic:validate({subscribe, Name})],
|
|
||||||
|
|
||||||
[emqtt_router:subscribe({Name, Qos}, self()) ||
|
[emqtt_router:subscribe({Name, Qos}, self()) ||
|
||||||
#mqtt_topic{name=Name, qos=Qos} <- Topics1],
|
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
||||||
|
|
||||||
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics1],
|
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
||||||
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
||||||
variable = #mqtt_frame_suback{
|
variable = #mqtt_frame_suback{
|
||||||
message_id = MessageId,
|
message_id = MessageId,
|
||||||
qos_table = GrantedQos}}),
|
qos_table = GrantedQos}}),
|
||||||
|
|
||||||
{ok, State#state{subtopics=Topics1}};
|
{ok, State};
|
||||||
|
|
||||||
process_request(?UNSUBSCRIBE,
|
process_request(?UNSUBSCRIBE,
|
||||||
#mqtt_frame{
|
#mqtt_frame{
|
||||||
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
||||||
topic_table = Topics },
|
topic_table = Topics },
|
||||||
payload = undefined}, #state{socket = Sock, client_id = ClientId,
|
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
|
||||||
subtopics = Subs0} = State) ->
|
|
||||||
|
|
||||||
|
|
||||||
[emqtt_router:unsubscribe(Name, self()) ||
|
[emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
||||||
#mqtt_topic{name=Name} <- Topics, emqtt_topic:validate(Name)],
|
|
||||||
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
||||||
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
||||||
|
|
||||||
%TODO: fixme later
|
{ok, State};
|
||||||
{ok, State #state{subtopics = Subs0}};
|
|
||||||
|
|
||||||
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
|
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
|
||||||
%Keep alive timer
|
%Keep alive timer
|
||||||
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
||||||
?INFO("~p", [KeepAlive1]),
|
|
||||||
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
||||||
{ok, State#state{keep_alive=KeepAlive1}};
|
{ok, State#state{keep_alive=KeepAlive1}};
|
||||||
|
|
||||||
|
@ -337,18 +344,16 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
||||||
will_qos = Qos,
|
will_qos = Qos,
|
||||||
will_topic = Topic,
|
will_topic = Topic,
|
||||||
will_msg = Msg }) ->
|
will_msg = Msg }) ->
|
||||||
#mqtt_msg{ retain = Retain,
|
#mqtt_msg{retain = Retain,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
dup = false,
|
dup = false,
|
||||||
payload = Msg }.
|
payload = Msg }.
|
||||||
|
|
||||||
supported_subs_qos(?QOS_0) -> ?QOS_0;
|
send_will_msg(#state{will_msg = undefined}) ->
|
||||||
supported_subs_qos(?QOS_1) -> ?QOS_1;
|
ignore;
|
||||||
supported_subs_qos(?QOS_2) -> ?QOS_1.
|
send_will_msg(#state{will_msg = WillMsg }) ->
|
||||||
|
emqtt_router:publish(WillMsg).
|
||||||
send_will(#state{ will_msg = WillMsg }) ->
|
|
||||||
?INFO("willmsg: ~p~n", [WillMsg]).
|
|
||||||
|
|
||||||
send_frame(Sock, Frame) ->
|
send_frame(Sock, Frame) ->
|
||||||
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
||||||
|
@ -357,7 +362,7 @@ send_frame(Sock, Frame) ->
|
||||||
network_error(_Reason,
|
network_error(_Reason,
|
||||||
State = #state{ conn_name = ConnStr}) ->
|
State = #state{ conn_name = ConnStr}) ->
|
||||||
?INFO("MQTT detected network error for ~p~n", [ConnStr]),
|
?INFO("MQTT detected network error for ~p~n", [ConnStr]),
|
||||||
send_will(State),
|
send_will_msg(State),
|
||||||
% todo: flush channel after publish
|
% todo: flush channel after publish
|
||||||
stop({shutdown, conn_closed}, State).
|
stop({shutdown, conn_closed}, State).
|
||||||
|
|
||||||
|
@ -386,10 +391,53 @@ valid_client_id(ClientId) ->
|
||||||
ClientIdLen = length(ClientId),
|
ClientIdLen = length(ClientId),
|
||||||
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
|
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
|
||||||
|
|
||||||
retained(false, _Topic, _Msg) ->
|
handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) ->
|
||||||
ignore;
|
ignore;
|
||||||
retained(true, Topic, #mqtt_msg{payload = <<>>}) ->
|
|
||||||
emqtt_retained:delete(Topic);
|
handle_retained(?PUBLISH, #mqtt_frame{
|
||||||
retained(true, Topic, Msg) ->
|
fixed = #mqtt_frame_fixed{retain = true},
|
||||||
emqtt_retained:insert(Topic, Msg).
|
variable = #mqtt_frame_publish{topic_name = Topic},
|
||||||
|
payload= <<>> }) ->
|
||||||
|
emqtt_retained:delete(Topic);
|
||||||
|
|
||||||
|
handle_retained(?PUBLISH, Frame=#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{retain = true},
|
||||||
|
variable = #mqtt_frame_publish{topic_name = Topic}}) ->
|
||||||
|
emqtt_retained:insert(Topic, make_msg(Frame));
|
||||||
|
|
||||||
|
handle_retained(_, _) ->
|
||||||
|
ignore.
|
||||||
|
|
||||||
|
validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) ->
|
||||||
|
case emqtt_topic:validate({publish, Topic}) of
|
||||||
|
true -> ok;
|
||||||
|
false -> {error, badtopic}
|
||||||
|
end;
|
||||||
|
|
||||||
|
validate_frame(?UNSUBSCRIBE, Frame) ->
|
||||||
|
validate_frame(?SUBSCRIBE, Frame);
|
||||||
|
|
||||||
|
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
||||||
|
ErrTopics = [Topic || Topic <- Topics, not emqtt_topic:validate({subscribe, Topic})],
|
||||||
|
case ErrTopics of
|
||||||
|
[] -> ok;
|
||||||
|
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|
||||||
|
end;
|
||||||
|
|
||||||
|
validate_frame(_Type, _Frame) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
make_msg(#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = Qos,
|
||||||
|
retain = Retain,
|
||||||
|
dup = Dup},
|
||||||
|
variable = #mqtt_frame_publish{topic_name = Topic,
|
||||||
|
message_id = MessageId},
|
||||||
|
payload = Payload}) ->
|
||||||
|
#mqtt_msg{retain = Retain,
|
||||||
|
qos = Qos,
|
||||||
|
topic = Topic,
|
||||||
|
dup = Dup,
|
||||||
|
message_id = MessageId,
|
||||||
|
payload = Payload}.
|
||||||
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ parse_frame(Bin, #mqtt_frame_fixed{ type = Type,
|
||||||
wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName,
|
wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName,
|
||||||
message_id = MessageId },
|
message_id = MessageId },
|
||||||
Payload, Rest);
|
Payload, Rest);
|
||||||
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
|
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
|
||||||
<<MessageId:16/big>> = FrameBin,
|
<<MessageId:16/big>> = FrameBin,
|
||||||
wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest);
|
wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest);
|
||||||
{Subs, <<FrameBin:Length/binary, Rest/binary>>}
|
{Subs, <<FrameBin:Length/binary, Rest/binary>>}
|
||||||
|
|
Loading…
Reference in New Issue