emqx/test/ws_client.erl

76 lines
1.8 KiB
Erlang

-module(ws_client).
-export([
start_link/0,
start_link/1,
send_binary/2,
send_ping/2,
recv/2,
recv/1,
stop/1
]).
-export([
init/2,
websocket_handle/3,
websocket_info/3,
websocket_terminate/3
]).
-record(state, {
buffer = [] :: list(),
waiting = undefined :: undefined | pid()
}).
start_link() ->
start_link("ws://localhost:8083/mqtt").
start_link(Url) ->
websocket_client:start_link(Url, ?MODULE, [], [{extra_headers, [{"Sec-Websocket-Protocol", "mqtt"}]}]).
stop(Pid) ->
Pid ! stop.
send_binary(Pid, Msg) ->
websocket_client:cast(Pid, {binary, Msg}).
send_ping(Pid, Msg) ->
websocket_client:cast(Pid, {ping, Msg}).
recv(Pid) ->
recv(Pid, 5000).
recv(Pid, Timeout) ->
Pid ! {recv, self()},
receive
M -> M
after
Timeout -> error
end.
init(_, _WSReq) ->
{ok, #state{}}.
websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
logger:info("Client received frame~p", [Frame]),
{ok, State#state{buffer = [Frame|Buffer]}};
websocket_handle(Frame, _, State = #state{waiting = From}) ->
logger:info("Client received frame~p", [Frame]),
From ! Frame,
{ok, State#state{waiting = undefined}}.
websocket_info({send_text, Text}, WSReq, State) ->
websocket_client:send({text, Text}, WSReq),
{ok, State};
websocket_info({recv, From}, _, State = #state{buffer = []}) ->
{ok, State#state{waiting = From}};
websocket_info({recv, From}, _, State = #state{buffer = [Top|Rest]}) ->
From ! Top,
{ok, State#state{buffer = Rest}};
websocket_info(stop, _, State) ->
{close, <<>>, State}.
websocket_terminate(Close, _, State) ->
io:format("Websocket closed with frame ~p and state ~p", [Close, State]),
ok.