emqttd_ws_client:subscribe/2 api
This commit is contained in:
parent
b08c39db52
commit
07b717104b
|
@ -34,7 +34,7 @@
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/1, ws_loop/3]).
|
-export([start_link/1, ws_loop/3, subscribe/2]).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
@ -61,6 +61,9 @@ start_link(Req) ->
|
||||||
packet_opts = PktOpts,
|
packet_opts = PktOpts,
|
||||||
parser = emqttd_parser:new(PktOpts)}).
|
parser = emqttd_parser:new(PktOpts)}).
|
||||||
|
|
||||||
|
subscribe(CPid, TopicTable) ->
|
||||||
|
gen_server:cast(CPid, {subscribe, TopicTable}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Start WebSocket client.
|
%% @doc Start WebSocket client.
|
||||||
|
@ -112,6 +115,10 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
||||||
handle_call(_Req, _From, State) ->
|
handle_call(_Req, _From, State) ->
|
||||||
{reply, error, State}.
|
{reply, error, State}.
|
||||||
|
|
||||||
|
handle_cast({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
|
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
|
||||||
|
{noreply, State#client_state{proto_state = ProtoState1}, hibernate};
|
||||||
|
|
||||||
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
|
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
|
@ -136,10 +143,6 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state
|
||||||
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
||||||
{noreply, State#client_state{proto_state = ProtoState1}};
|
{noreply, State#client_state{proto_state = ProtoState1}};
|
||||||
|
|
||||||
handle_info({subscribe, TopicTable}, State = #client_state{proto_state = ProtoState}) ->
|
|
||||||
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
|
|
||||||
{noreply, State#client_state{proto_state = ProtoState1}};
|
|
||||||
|
|
||||||
handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) ->
|
handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
|
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
|
||||||
stop({shutdown, duplicate_id}, State);
|
stop({shutdown, duplicate_id}, State);
|
||||||
|
|
Loading…
Reference in New Issue