Merge pull request #1747 from Gilbert-Wong/emqx30
fix subscribe bugs and update emqx_mock_client
This commit is contained in:
commit
a183693ac8
11
src/emqx.erl
11
src/emqx.erl
|
@ -94,8 +94,11 @@ unsubscribe(Topic) ->
|
||||||
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
|
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
|
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
|
||||||
unsubscribe(Topic, Subscriber) ->
|
unsubscribe(Topic, Sub) when is_list(Sub) ->
|
||||||
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)).
|
emqx_broker:unsubscribe(iolist_to_binary(Topic));
|
||||||
|
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
|
||||||
|
{SubPid, SubId} = Subscriber,
|
||||||
|
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% PubSub management API
|
%% PubSub management API
|
||||||
|
@ -116,9 +119,9 @@ topics() -> emqx_router:topics().
|
||||||
subscribers(Topic) ->
|
subscribers(Topic) ->
|
||||||
emqx_broker:subscribers(iolist_to_binary(Topic)).
|
emqx_broker:subscribers(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
-spec(subscriptions(subscriber() | string()) -> [{topic(), subopts()}]).
|
-spec(subscriptions(subscriber()) -> [{topic(), subopts()}]).
|
||||||
subscriptions(Subscriber) ->
|
subscriptions(Subscriber) ->
|
||||||
emqx_broker:subscriptions(list_to_subid(Subscriber)).
|
emqx_broker:subscriptions(Subscriber).
|
||||||
|
|
||||||
-spec(subscribed(topic() | string(), subscriber()) -> boolean()).
|
-spec(subscribed(topic() | string(), subscriber()) -> boolean()).
|
||||||
subscribed(Topic, Subscriber) ->
|
subscribed(Topic, Subscriber) ->
|
||||||
|
|
|
@ -379,9 +379,18 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
insert_subscriber(Group, Topic, Subscriber) ->
|
||||||
|
Subscribers = subscribers(Topic),
|
||||||
|
case lists:member(Subscriber, Subscribers) of
|
||||||
|
false ->
|
||||||
|
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)});
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
do_subscribe(Group, Topic, Subscriber, SubOpts) ->
|
do_subscribe(Group, Topic, Subscriber, SubOpts) ->
|
||||||
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
||||||
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
|
insert_subscriber(Group, Topic, Subscriber),
|
||||||
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
|
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
|
||||||
|
|
||||||
do_unsubscribe(Group, Topic, Subscriber) ->
|
do_unsubscribe(Group, Topic, Subscriber) ->
|
||||||
|
|
|
@ -18,28 +18,42 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-export([start_link/1, start_session/1, stop/1]).
|
-export([start_link/1, open_session/3, stop/1]).
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {clientid, session}).
|
-record(state, {clean_start, client_id, client_pid}).
|
||||||
|
|
||||||
start_link(ClientId) ->
|
start_link(ClientId) ->
|
||||||
gen_server:start_link(?MODULE, [ClientId], []).
|
gen_server:start_link(?MODULE, [ClientId], []).
|
||||||
|
|
||||||
start_session(CPid) ->
|
open_session(ClientPid, ClientId, Zone) ->
|
||||||
gen_server:call(CPid, start_session).
|
gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}).
|
||||||
|
|
||||||
stop(CPid) ->
|
stop(CPid) ->
|
||||||
gen_server:call(CPid, stop).
|
gen_server:call(CPid, stop).
|
||||||
|
|
||||||
init([ClientId]) ->
|
init([ClientId]) ->
|
||||||
{ok, #state{clientid = ClientId}}.
|
{ok,
|
||||||
|
#state{clean_start = true,
|
||||||
|
client_id = ClientId}
|
||||||
|
}.
|
||||||
|
|
||||||
handle_call(start_session, _From, State = #state{clientid = ClientId}) ->
|
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
||||||
{ok, SessPid, _} = emqx_sm:start_session(true, {ClientId, undefined}),
|
Attrs = #{ zone => Zone,
|
||||||
{reply, {ok, SessPid}, State#state{session = SessPid}};
|
client_id => ClientId,
|
||||||
|
client_pid => ClientPid,
|
||||||
|
clean_start => true,
|
||||||
|
username => undefined,
|
||||||
|
conn_props => undefined
|
||||||
|
},
|
||||||
|
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
||||||
|
{reply, {ok, SessPid}, State#state{
|
||||||
|
clean_start = true,
|
||||||
|
client_id = ClientId,
|
||||||
|
client_pid = ClientPid
|
||||||
|
}};
|
||||||
|
|
||||||
handle_call(stop, _From, State) ->
|
handle_call(stop, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
|
Loading…
Reference in New Issue