fix unsubscribe emqx_mock_client error

This commit is contained in:
Gilbert Wong 2018-08-27 15:24:40 +08:00
parent 28e22825ba
commit 087bfe80c8
2 changed files with 27 additions and 10 deletions

View File

@ -94,8 +94,11 @@ unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
unsubscribe(Topic, Subscriber) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)).
unsubscribe(Topic, Sub) when is_list(Sub) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic));
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
{SubPid, SubId} = Subscriber,
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
%%--------------------------------------------------------------------
%% PubSub management API

View File

@ -18,28 +18,42 @@
-behaviour(gen_server).
-export([start_link/1, start_session/1, stop/1]).
-export([start_link/1, open_session/3, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {clientid, session}).
-record(state, {clean_start, client_id, client_pid}).
start_link(ClientId) ->
gen_server:start_link(?MODULE, [ClientId], []).
start_session(CPid) ->
gen_server:call(CPid, start_session).
open_session(ClientPid, ClientId, Zone) ->
gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}).
stop(CPid) ->
gen_server:call(CPid, stop).
init([ClientId]) ->
{ok, #state{clientid = ClientId}}.
{ok,
#state{clean_start = true,
client_id = ClientId}
}.
handle_call(start_session, _From, State = #state{clientid = ClientId}) ->
{ok, SessPid, _} = emqx_sm:start_session(true, {ClientId, undefined}),
{reply, {ok, SessPid}, State#state{session = SessPid}};
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
Attrs = #{ zone => Zone,
client_id => ClientId,
client_pid => ClientPid,
clean_start => true,
username => undefined,
conn_props => undefined
},
{ok, SessPid} = emqx_sm:open_session(Attrs),
{reply, {ok, SessPid}, State#state{
clean_start = true,
client_id = ClientId,
client_pid = ClientPid
}};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};