147 lines
4.0 KiB
Erlang
147 lines
4.0 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc The Registry Centre of Gateway
|
|
-module(emqx_gateway_registry).
|
|
|
|
-include("emqx_gateway.hrl").
|
|
|
|
-behaviour(gen_server).
|
|
|
|
%% APIs
|
|
-export([
|
|
reg/2,
|
|
unreg/1,
|
|
list/0,
|
|
lookup/1
|
|
]).
|
|
|
|
%% APIs
|
|
-export([start_link/0]).
|
|
|
|
%% gen_server callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
-export_type([descriptor/0]).
|
|
|
|
-record(state, {
|
|
reged = #{} :: #{gateway_name() => descriptor()}
|
|
}).
|
|
|
|
-type registry_options() :: [registry_option()].
|
|
|
|
-type registry_option() :: {cbkmod, atom()}.
|
|
|
|
-type descriptor() :: #{
|
|
cbkmod := atom(),
|
|
rgopts := registry_options()
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_link() ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Mgmt
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec reg(gateway_name(), registry_options()) ->
|
|
ok
|
|
| {error, any()}.
|
|
reg(Name, RgOpts) ->
|
|
CbMod = proplists:get_value(cbkmod, RgOpts, Name),
|
|
Dscrptr = #{
|
|
cbkmod => CbMod,
|
|
rgopts => RgOpts
|
|
},
|
|
call({reg, Name, Dscrptr}).
|
|
|
|
-spec unreg(gateway_name()) -> ok | {error, any()}.
|
|
unreg(Name) ->
|
|
%% TODO: Checking ALL INSTANCE HAS STOPPED
|
|
call({unreg, Name}).
|
|
|
|
%% TODO:
|
|
%unreg(Name, Force) ->
|
|
% call({unreg, Name, Froce}).
|
|
|
|
%% @doc Return all registered protocol gateway implementation
|
|
-spec list() -> [{gateway_name(), descriptor()}].
|
|
list() ->
|
|
call(all).
|
|
|
|
-spec lookup(gateway_name()) -> undefined | descriptor().
|
|
lookup(Name) ->
|
|
call({lookup, Name}).
|
|
|
|
call(Req) ->
|
|
gen_server:call(?MODULE, Req, 5000).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
process_flag(trap_exit, true),
|
|
{ok, #state{reged = #{}}}.
|
|
|
|
handle_call({reg, Name, Dscrptr}, _From, State = #state{reged = Gateways}) ->
|
|
case maps:get(Name, Gateways, notfound) of
|
|
notfound ->
|
|
NGateways = maps:put(Name, Dscrptr, Gateways),
|
|
{reply, ok, State#state{reged = NGateways}};
|
|
_ ->
|
|
{reply, {error, already_existed}, State}
|
|
end;
|
|
handle_call({unreg, Name}, _From, State = #state{reged = Gateways}) ->
|
|
case maps:get(Name, Gateways, undefined) of
|
|
undefined ->
|
|
{reply, ok, State};
|
|
_ ->
|
|
_ = emqx_gateway_sup:unload_gateway(Name),
|
|
{reply, ok, State#state{reged = maps:remove(Name, Gateways)}}
|
|
end;
|
|
handle_call(all, _From, State = #state{reged = Gateways}) ->
|
|
{reply, maps:to_list(Gateways), State};
|
|
handle_call({lookup, Name}, _From, State = #state{reged = Gateways}) ->
|
|
Reply = maps:get(Name, Gateways, undefined),
|
|
{reply, Reply, State};
|
|
handle_call(Req, _From, State) ->
|
|
logger:error("Unexpected call: ~0p", [Req]),
|
|
{reply, ok, State}.
|
|
|
|
handle_cast(_Msg, State) ->
|
|
{noreply, State}.
|
|
|
|
handle_info(_Info, State) ->
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|