diff --git a/src/emqx.erl b/src/emqx.erl index c39dde931..54f1952f4 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -94,8 +94,11 @@ unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). -spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}). -unsubscribe(Topic, Subscriber) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)). +unsubscribe(Topic, Sub) when is_list(Sub) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic)); +unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> + {SubPid, SubId} = Subscriber, + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId). %%-------------------------------------------------------------------- %% PubSub management API @@ -116,9 +119,9 @@ topics() -> emqx_router:topics(). subscribers(Topic) -> emqx_broker:subscribers(iolist_to_binary(Topic)). --spec(subscriptions(subscriber() | string()) -> [{topic(), subopts()}]). +-spec(subscriptions(subscriber()) -> [{topic(), subopts()}]). subscriptions(Subscriber) -> - emqx_broker:subscriptions(list_to_subid(Subscriber)). + emqx_broker:subscriptions(Subscriber). -spec(subscribed(topic() | string(), subscriber()) -> boolean()). subscribed(Topic, Subscriber) -> diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 8fed40b7f..560c095cf 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -379,9 +379,18 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +insert_subscriber(Group, Topic, Subscriber) -> + Subscribers = subscribers(Topic), + case lists:member(Subscriber, Subscribers) of + false -> + ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}); + _ -> + ok + end. + do_subscribe(Group, Topic, Subscriber, SubOpts) -> ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), - ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), + insert_subscriber(Group, Topic, Subscriber), ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}). do_unsubscribe(Group, Topic, Subscriber) -> diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 2b18c348f..e76e5551c 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -18,28 +18,42 @@ -behaviour(gen_server). --export([start_link/1, start_session/1, stop/1]). +-export([start_link/1, open_session/3, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clientid, session}). +-record(state, {clean_start, client_id, client_pid}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). -start_session(CPid) -> - gen_server:call(CPid, start_session). +open_session(ClientPid, ClientId, Zone) -> + gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}). stop(CPid) -> gen_server:call(CPid, stop). init([ClientId]) -> - {ok, #state{clientid = ClientId}}. + {ok, + #state{clean_start = true, + client_id = ClientId} + }. -handle_call(start_session, _From, State = #state{clientid = ClientId}) -> - {ok, SessPid, _} = emqx_sm:start_session(true, {ClientId, undefined}), - {reply, {ok, SessPid}, State#state{session = SessPid}}; +handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> + Attrs = #{ zone => Zone, + client_id => ClientId, + client_pid => ClientPid, + clean_start => true, + username => undefined, + conn_props => undefined + }, + {ok, SessPid} = emqx_sm:open_session(Attrs), + {reply, {ok, SessPid}, State#state{ + clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call(stop, _From, State) -> {stop, normal, ok, State};