commit
e8214e43ff
|
@ -59,7 +59,7 @@ make(From, Topic, Payload) ->
|
||||||
Topic :: binary(),
|
Topic :: binary(),
|
||||||
Payload :: binary().
|
Payload :: binary().
|
||||||
make(From, Qos, Topic, Payload) ->
|
make(From, Qos, Topic, Payload) ->
|
||||||
#mqtt_message{msgid = msgid(Qos),
|
#mqtt_message{msgid = msgid(?QOS_I(Qos)),
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
from = From,
|
from = From,
|
||||||
qos = ?QOS_I(Qos),
|
qos = ?QOS_I(Qos),
|
||||||
|
|
|
@ -51,14 +51,14 @@ start_link(I) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
submit(Fun) ->
|
submit(Fun) ->
|
||||||
gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity).
|
gen_server:call(gproc_pool:pick_worker(pooler), {submit, Fun}, infinity).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Submit work to pooler asynchronously
|
%% @doc Submit work to pooler asynchronously
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
async_submit(Fun) ->
|
async_submit(Fun) ->
|
||||||
gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}).
|
gen_server:cast(gproc_pool:pick_worker(pooler), {async_submit, Fun}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
|
|
@ -110,7 +110,7 @@
|
||||||
%% Max Packets that Awaiting PUBREL
|
%% Max Packets that Awaiting PUBREL
|
||||||
max_awaiting_rel = 100,
|
max_awaiting_rel = 100,
|
||||||
|
|
||||||
%% Awaiting timers for ack, rel and comp.
|
%% Awaiting timers for ack, rel.
|
||||||
awaiting_ack :: map(),
|
awaiting_ack :: map(),
|
||||||
|
|
||||||
%% Retries to resend the unacked messages
|
%% Retries to resend the unacked messages
|
||||||
|
@ -354,13 +354,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
%% Dequeue pending messages
|
%% Dequeue pending messages
|
||||||
noreply(dequeue(Session2));
|
noreply(dequeue(Session2));
|
||||||
|
|
||||||
%% PUBRAC
|
%% PUBACK
|
||||||
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
|
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) ->
|
||||||
case maps:find(PktId, Awaiting) of
|
case maps:find(PktId, AwaitingAck) of
|
||||||
{ok, {_, TRef}} ->
|
{ok, {_, TRef}} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
Session1 = acked(PktId, Session),
|
noreply(dequeue(acked(PktId, Session)));
|
||||||
noreply(dequeue(Session1));
|
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
|
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
|
||||||
noreply(Session)
|
noreply(Session)
|
||||||
|
@ -375,7 +374,8 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
|
||||||
{ok, {_, TRef}} ->
|
{ok, {_, TRef}} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
|
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
|
||||||
Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}),
|
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
|
||||||
|
Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
|
||||||
noreply(dequeue(Session1));
|
noreply(dequeue(Session1));
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
|
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
|
||||||
|
@ -564,10 +564,11 @@ dequeue(Session) ->
|
||||||
|
|
||||||
dequeue2(Session = #session{message_queue = Q}) ->
|
dequeue2(Session = #session{message_queue = Q}) ->
|
||||||
case emqttd_mqueue:out(Q) of
|
case emqttd_mqueue:out(Q) of
|
||||||
{empty, _Q} -> Session;
|
{empty, _Q} ->
|
||||||
|
Session;
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
Session1 = deliver(Msg, Session#session{message_queue = Q1}),
|
%% dequeue more
|
||||||
dequeue(Session1) %% dequeue more
|
dequeue(deliver(Msg, Session#session{message_queue = Q1}))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
|
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
|
||||||
|
@ -608,8 +609,8 @@ acked(PktId, Session = #session{client_id = ClientId,
|
||||||
false ->
|
false ->
|
||||||
lager:error("Session(~s) cannot find acked message: ~p", [PktId])
|
lager:error("Session(~s) cannot find acked message: ~p", [PktId])
|
||||||
end,
|
end,
|
||||||
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
|
||||||
awaiting_ack = maps:remove(PktId, Awaiting)}.
|
inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.
|
||||||
|
|
||||||
next_packet_id(Session = #session{packet_id = 16#ffff}) ->
|
next_packet_id(Session = #session{packet_id = 16#ffff}) ->
|
||||||
Session#session{packet_id = 1};
|
Session#session{packet_id = 1};
|
||||||
|
@ -627,3 +628,4 @@ cancel_timer(Ref) ->
|
||||||
|
|
||||||
noreply(State) ->
|
noreply(State) ->
|
||||||
{noreply, State, hibernate}.
|
{noreply, State, hibernate}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue