Update comments and misc fix
This commit is contained in:
parent
7c90e08f57
commit
e008d149d3
|
@ -113,7 +113,7 @@ stop_tick(TRef) ->
|
||||||
timer:cancel(TRef).
|
timer:cancel(TRef).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
|
|
@ -291,7 +291,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% Receive and parse tcp data
|
%% Receive and Parse TCP Data
|
||||||
received(<<>>, State) ->
|
received(<<>>, State) ->
|
||||||
{noreply, gc(State), hibernate};
|
{noreply, gc(State), hibernate};
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
%% @doc Serialise MQTT Packet
|
%% @doc Serialise MQTT Packet
|
||||||
-spec(serialize(mqtt_packet()) -> iolist()).
|
-spec(serialize(mqtt_packet()) -> iolist()).
|
||||||
serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
||||||
variable = Variable,
|
variable = Variable,
|
||||||
payload = Payload}) ->
|
payload = Payload}) ->
|
||||||
serialize_header(Header,
|
serialize_header(Header,
|
||||||
|
|
|
@ -129,12 +129,12 @@
|
||||||
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
|
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
|
||||||
awaiting_rel :: map(),
|
awaiting_rel :: map(),
|
||||||
|
|
||||||
%% Awaiting PUBREL timeout
|
|
||||||
await_rel_timeout = 20000 :: timeout(),
|
|
||||||
|
|
||||||
%% Max Packets that Awaiting PUBREL
|
%% Max Packets that Awaiting PUBREL
|
||||||
max_awaiting_rel = 100 :: non_neg_integer(),
|
max_awaiting_rel = 100 :: non_neg_integer(),
|
||||||
|
|
||||||
|
%% Awaiting PUBREL timeout
|
||||||
|
await_rel_timeout = 20000 :: timeout(),
|
||||||
|
|
||||||
%% Awaiting PUBREL timer
|
%% Awaiting PUBREL timer
|
||||||
await_rel_timer :: reference(),
|
await_rel_timer :: reference(),
|
||||||
|
|
||||||
|
@ -580,7 +580,7 @@ code_change(_OldVsn, Session, _Extra) ->
|
||||||
{ok, Session}.
|
{ok, Session}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Kick old client
|
%% Kickout old client
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
kick(_ClientId, undefined, _Pid) ->
|
kick(_ClientId, undefined, _Pid) ->
|
||||||
ignore;
|
ignore;
|
||||||
|
|
|
@ -37,8 +37,8 @@
|
||||||
|
|
||||||
%% @doc Handle WebSocket Request.
|
%% @doc Handle WebSocket Request.
|
||||||
handle_request(Req) ->
|
handle_request(Req) ->
|
||||||
{ok, Env} = emqttd:env(protocol),
|
{ok, ProtoEnv} = emqttd:env(protocol),
|
||||||
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
|
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
|
||||||
Parser = emqttd_parser:initial_state(PacketSize),
|
Parser = emqttd_parser:initial_state(PacketSize),
|
||||||
%% Upgrade WebSocket.
|
%% Upgrade WebSocket.
|
||||||
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
||||||
|
|
|
@ -92,14 +92,15 @@ init([Env, WsPid, Req, ReplyChannel]) ->
|
||||||
{ok, Peername} = Req:get(peername),
|
{ok, Peername} = Req:get(peername),
|
||||||
Headers = mochiweb_headers:to_list(
|
Headers = mochiweb_headers:to_list(
|
||||||
mochiweb_request:get(headers, Req)),
|
mochiweb_request:get(headers, Req)),
|
||||||
|
Conn = Req:get(connection),
|
||||||
ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
|
ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel),
|
||||||
[{ws_initial_headers, Headers} | Env]),
|
[{ws_initial_headers, Headers} | Env]),
|
||||||
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
|
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
|
||||||
EnableStats = get_value(client_enable_stats, Env, false),
|
EnableStats = get_value(client_enable_stats, Env, false),
|
||||||
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
||||||
{ok, #wsclient_state{ws_pid = WsPid,
|
{ok, #wsclient_state{connection = Conn,
|
||||||
|
ws_pid = WsPid,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
connection = Req:get(connection),
|
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
enable_stats = EnableStats,
|
enable_stats = EnableStats,
|
||||||
force_gc_count = ForceGcCount},
|
force_gc_count = ForceGcCount},
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
%% @doc Start a WebSocket Client
|
%% @doc Start a WebSocket Connection.
|
||||||
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
|
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
|
||||||
start_client(WsPid, Req, ReplyChannel) ->
|
start_client(WsPid, Req, ReplyChannel) ->
|
||||||
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
|
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
|
||||||
|
|
Loading…
Reference in New Issue