diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 34a130508..f78db2ad0 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -70,14 +70,14 @@ lookup_proc(ClientId) when is_binary(ClientId) -> %% @doc Register ClientId with Pid. -spec(register(Client :: mqtt_client()) -> ok). register(Client = #mqtt_client{client_id = ClientId}) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), - gen_server2:cast(CmPid, {register, Client}). + gen_server2:call(pick(ClientId), {register, Client}, 120000). %% @doc Unregister clientId with pid. -spec(unregister(ClientId :: binary()) -> ok). unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), - gen_server2:cast(CmPid, {unregister, ClientId, self()}). + gen_server2:cast(pick(ClientId), {unregister, ClientId, self()}). + +pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -85,16 +85,16 @@ unregister(ClientId) when is_binary(ClientId) -> init([Pool, Id, StatsFun]) -> ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, - statsfun = StatsFun, - monitors = dict:new()}}. + {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. -prioritise_call(_Req, _From, _Len, _State) -> - 1. +prioritise_call(Req, _From, _Len, _State) -> + case Req of + {register, _Client} -> 2; + _ -> 1 + end. prioritise_cast(Msg, _Len, _State) -> case Msg of - {register, _Client} -> 2; {unregister, _ClientId, _Pid} -> 9; _ -> 1 end. @@ -102,19 +102,19 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(_Msg, _Len, _State) -> 3. -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast({register, Client = #mqtt_client{client_id = ClientId, - client_pid = Pid}}, State) -> +handle_call({register, Client = #mqtt_client{client_id = ClientId, + client_pid = Pid}}, _From, State) -> case lookup_proc(ClientId) of Pid -> - {noreply, State}; + {reply, ok, State}; _ -> ets:insert(mqtt_client, Client), - {noreply, setstats(monitor_client(ClientId, Pid, State))} + {reply, ok, setstats(monitor_client(ClientId, Pid, State))} end; +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + handle_cast({unregister, ClientId, Pid}, State) -> case lookup_proc(ClientId) of Pid ->