diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl index 997d6bc72..9099ecd4d 100644 --- a/apps/emqx_gateway/include/emqx_gateway.hrl +++ b/apps/emqx_gateway/include/emqx_gateway.hrl @@ -18,11 +18,11 @@ -define(EMQX_GATEWAY_HRL, 1). -type instance_id() :: atom(). --type gateway_type() :: atom(). +-type gateway_name() :: atom(). %% @doc The Gateway defination -type gateway() :: - #{ type := gateway_type() + #{ name := gateway_name() , descr => binary() | undefined %% Appears only in creating or detailed info , rawconf => map() diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index a27db934f..b3f7b640a 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -48,18 +48,18 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_gateway_load(_Gateway = #{type := GwType, +on_gateway_load(_Gateway = #{name := GwName, rawconf := RawConf }, Ctx) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(GwType, Ctx, Lis) + start_listener(GwName, Ctx, Lis) end, Listeners), {ok, ListenerPids, #{ctx => Ctx}}. on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwType = maps:get(type, NewGateway), + GwName = maps:get(name, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? @@ -69,37 +69,37 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> Class : Reason : Stk -> logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [GwType, Class, Reason, Stk]), + [GwName, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_gateway_unload(_Gateway = #{ type := GwType, +on_gateway_unload(_Gateway = #{ name := GwName, rawconf := RawConf }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(GwType, Lis) + stop_listener(GwName, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]), + [GwName, Type, ListenOnStr]), Pid; {error, Reason} -> ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]), + [GwName, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwType, Type), +start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_coap_frame, @@ -114,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). -name(GwType, Type) -> - list_to_atom(lists:concat([GwType, ":", Type])). +name(GwName, Type) -> + list_to_atom(lists:concat([GwName, ":", Type])). -stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]); + [GwName, Type, ListenOnStr]); {error, Reason} -> ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]) + [GwName, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwType, Type), +stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 2835b91ff..01d5897ff 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -30,7 +30,7 @@ ]). -spec registered_gateway() -> - [{gateway_type(), emqx_gateway_registry:descriptor()}]. + [{gateway_name(), emqx_gateway_registry:descriptor()}]. registered_gateway() -> emqx_gateway_registry:list(). @@ -41,35 +41,35 @@ registered_gateway() -> list() -> emqx_gateway_sup:list_gateway_insta(). --spec load(gateway_type(), map()) +-spec load(gateway_name(), map()) -> {ok, pid()} | {error, any()}. -load(GwType, RawConf) -> - Gateway = #{ type => GwType +load(Name, RawConf) -> + Gateway = #{ name => Name , descr => undefined , rawconf => RawConf }, emqx_gateway_sup:load_gateway(Gateway). --spec unload(gateway_type()) -> ok | {error, any()}. -unload(GwType) -> - emqx_gateway_sup:unload_gateway(GwType). +-spec unload(gateway_name()) -> ok | {error, any()}. +unload(Name) -> + emqx_gateway_sup:unload_gateway(Name). --spec lookup(gateway_type()) -> gateway() | undefined. -lookup(GwType) -> - emqx_gateway_sup:lookup_gateway(GwType). +-spec lookup(gateway_name()) -> gateway() | undefined. +lookup(Name) -> + emqx_gateway_sup:lookup_gateway(Name). -spec update(gateway()) -> ok | {error, any()}. update(NewGateway) -> emqx_gateway_sup:update_gateway(NewGateway). --spec start(gateway_type()) -> ok | {error, any()}. -start(GwType) -> - emqx_gateway_sup:start_gateway_insta(GwType). +-spec start(gateway_name()) -> ok | {error, any()}. +start(Name) -> + emqx_gateway_sup:start_gateway_insta(Name). --spec stop(gateway_type()) -> ok | {error, any()}. -stop(GwType) -> - emqx_gateway_sup:stop_gateway_insta(GwType). +-spec stop(gateway_name()) -> ok | {error, any()}. +stop(Name) -> + emqx_gateway_sup:stop_gateway_insta(Name). %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index fbd559424..2430b38e6 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -50,30 +50,31 @@ is_cmd(Fun) -> %% Cmds gateway(["list"]) -> - lists:foreach(fun(#{type := Type, status := Status}) -> - %% FIXME: Get the real running status - emqx_ctl:print("Gateway(type=~s, status=~s~n", - [Type, Status]) + lists:foreach(fun(#{name := Name} = Gateway) -> + %% XXX: More infos: listeners?, connected? + Status = maps:get(status, Gateway, stopped), + emqx_ctl:print("Gateway(name=~s, status=~s)~n", + [Name, Status]) end, emqx_gateway:list()); -gateway(["lookup", GwType]) -> - case emqx_gateway:lookup(atom(GwType)) of +gateway(["lookup", Name]) -> + case emqx_gateway:lookup(atom(Name)) of undefined -> emqx_ctl:print("undefined~n"); Info -> emqx_ctl:print("~p~n", [Info]) end; -gateway(["stop", GwType]) -> - case emqx_gateway:stop(atom(GwType)) of +gateway(["stop", Name]) -> + case emqx_gateway:stop(atom(Name)) of ok -> emqx_ctl:print("ok~n"); {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) end; -gateway(["start", GwType]) -> - case emqx_gateway:start(atom(GwType)) of +gateway(["start", Name]) -> + case emqx_gateway:start(atom(Name)) of ok -> emqx_ctl:print("ok~n"); {error, Reason} -> @@ -84,60 +85,60 @@ gateway(_) -> %% TODO: create/remove APIs emqx_ctl:usage([ {"gateway list", "List all gateway"} - , {"gateway lookup ", - "Looup a gateway detailed informations"} - , {"gateway stop ", + , {"gateway lookup ", + "Lookup a gateway detailed informations"} + , {"gateway stop ", "Stop a gateway instance"} - , {"gateway start ", + , {"gateway start ", "Start a gateway instance"} ]). 'gateway-registry'(["list"]) -> lists:foreach( - fun({GwType, #{cbkmod := CbMod}}) -> - emqx_ctl:print("Registered Type: ~s, Callback Module: ~s~n", [GwType, CbMod]) + fun({Name, #{cbkmod := CbMod}}) -> + emqx_ctl:print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod]) end, emqx_gateway_registry:list()); 'gateway-registry'(_) -> emqx_ctl:usage([ {"gateway-registry list", - "List all registered gateway types"} + "List all registered gateways"} ]). -'gateway-clients'(["list", Type]) -> - InfoTab = emqx_gateway_cm:tabname(info, Type), +'gateway-clients'(["list", Name]) -> + InfoTab = emqx_gateway_cm:tabname(info, Name), dump(InfoTab, client); -'gateway-clients'(["lookup", Type, ClientId]) -> - ChanTab = emqx_gateway_cm:tabname(chan, Type), +'gateway-clients'(["lookup", Name, ClientId]) -> + ChanTab = emqx_gateway_cm:tabname(chan, Name), case ets:lookup(ChanTab, bin(ClientId)) of [] -> emqx_ctl:print("Not Found.~n"); [Chann] -> - InfoTab = emqx_gateway_cm:tabname(info, Type), + InfoTab = emqx_gateway_cm:tabname(info, Name), [ChannInfo] = ets:lookup(InfoTab, Chann), print({client, ChannInfo}) end; -'gateway-clients'(["kick", Type, ClientId]) -> - case emqx_gateway_cm:kick_session(Type, bin(ClientId)) of +'gateway-clients'(["kick", Name, ClientId]) -> + case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of ok -> emqx_ctl:print("ok~n"); _ -> emqx_ctl:print("Not Found.~n") end; 'gateway-clients'(_) -> - emqx_ctl:usage([ {"gateway-clients list ", - "List all clients for a type of gateway"} - , {"gateway-clients lookup ", + emqx_ctl:usage([ {"gateway-clients list ", + "List all clients for a gateway"} + , {"gateway-clients lookup ", "Lookup the Client Info for specified client"} - , {"gateway-clients kick ", + , {"gateway-clients kick ", "Kick out a client"} ]). -'gateway-metrics'([GatewayType]) -> - Tab = emqx_gateway_metrics:tabname(GatewayType), +'gateway-metrics'([Name]) -> + Tab = emqx_gateway_metrics:tabname(Name), case ets:info(Tab) of undefined -> - emqx_ctl:print("Bad Gateway Type.~n"); + emqx_ctl:print("Bad Gateway Name.~n"); _ -> lists:foreach( fun({K, V}) -> @@ -146,7 +147,7 @@ gateway(_) -> end; 'gateway-metrics'(_) -> - emqx_ctl:usage([ {"gateway-metrics ", + emqx_ctl:usage([ {"gateway-metrics ", "List all metrics for a gateway"} ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 83142abb1..7a7ad055d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -61,13 +61,13 @@ ]). -record(state, { - type :: atom(), %% Gateway Type - locker :: pid(), %% ClientId Locker for CM - registry :: pid(), %% ClientId Registry server + gwname :: gateway_name(), %% Gateway Name + locker :: pid(), %% ClientId Locker for CM + registry :: pid(), %% ClientId Registry server chan_pmon :: emqx_pmon:pmon() }). --type option() :: {type, gateway_type()}. +-type option() :: {gwname, gateway_name()}. -type options() :: list(option()). -define(T_TAKEOVER, 15000). @@ -79,142 +79,147 @@ -spec start_link(options()) -> {ok, pid()} | ignore | {error, any()}. start_link(Options) -> - Type = proplists:get_value(type, Options), - gen_server:start_link({local, procname(Type)}, ?MODULE, Options, []). + GwName = proplists:get_value(gwname, Options), + gen_server:start_link({local, procname(GwName)}, ?MODULE, Options, []). -procname(Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_cm'])). +procname(GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_cm'])). --spec cmtabs(Type :: atom()) -> {ChanTab :: atom(), - ConnTab :: atom(), - ChannInfoTab :: atom()}. -cmtabs(Type) -> - { tabname(chan, Type) %% Client Tabname; Record: {ClientId, Pid} - , tabname(conn, Type) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod} - , tabname(info, Type) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats} +-spec cmtabs(GwName :: gateway_name()) + -> {ChanTab :: atom(), + ConnTab :: atom(), + ChannInfoTab :: atom()}. +cmtabs(GwName) -> + { tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid} + , tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod} + , tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats} }. -tabname(chan, Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_channel'])); -tabname(conn, Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_conn'])); -tabname(info, Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_info'])). +tabname(chan, GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel'])); +tabname(conn, GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_conn'])); +tabname(info, GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_info'])). -lockername(Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_locker'])). +lockername(GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_locker'])). --spec register_channel(atom(), binary(), pid(), emqx_types:conninfo()) -> ok. -register_channel(Type, ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> +-spec register_channel(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:conninfo()) -> ok. +register_channel(GwName, ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> Chan = {ClientId, ChanPid}, - true = ets:insert(tabname(chan, Type), Chan), - true = ets:insert(tabname(conn, Type), {Chan, ConnMod}), - ok = emqx_gateway_cm_registry:register_channel(Type, Chan), - cast(procname(Type), {registered, Chan}). + true = ets:insert(tabname(chan, GwName), Chan), + true = ets:insert(tabname(conn, GwName), {Chan, ConnMod}), + ok = emqx_gateway_cm_registry:register_channel(GwName, Chan), + cast(procname(GwName), {registered, Chan}). %% @doc Unregister a channel. --spec unregister_channel(atom(), emqx_types:clientid()) -> ok. -unregister_channel(Type, ClientId) when is_binary(ClientId) -> - true = do_unregister_channel(Type, {ClientId, self()}, cmtabs(Type)), +-spec unregister_channel(gateway_name(), emqx_types:clientid()) -> ok. +unregister_channel(GwName, ClientId) when is_binary(ClientId) -> + true = do_unregister_channel(GwName, {ClientId, self()}, cmtabs(GwName)), ok. %% @doc Insert/Update the channel info and stats --spec insert_channel_info(atom(), +-spec insert_channel_info(gateway_name(), emqx_types:clientid(), emqx_types:infos(), emqx_types:stats()) -> ok. -insert_channel_info(Type, ClientId, Info, Stats) -> +insert_channel_info(GwName, ClientId, Info, Stats) -> Chan = {ClientId, self()}, - true = ets:insert(tabname(info, Type), {Chan, Info, Stats}), + true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}), %%?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @doc Get info of a channel. --spec get_chan_info(gateway_type(), emqx_types:clientid()) +-spec get_chan_info(gateway_name(), emqx_types:clientid()) -> emqx_types:infos() | undefined. -get_chan_info(Type, ClientId) -> - with_channel(Type, ClientId, +get_chan_info(GwName, ClientId) -> + with_channel(GwName, ClientId, fun(ChanPid) -> - get_chan_info(Type, ClientId, ChanPid) + get_chan_info(GwName, ClientId, ChanPid) end). --spec get_chan_info(gateway_type(), emqx_types:clientid(), pid()) +-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined. -get_chan_info(Type, ClientId, ChanPid) when node(ChanPid) == node() -> +get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(tabname(info, Type), Chan, 2) + try ets:lookup_element(tabname(info, GwName), Chan, 2) catch error:badarg -> undefined end; -get_chan_info(Type, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [Type, ClientId, ChanPid]). +get_chan_info(GwName, ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]). %% @doc Update infos of the channel. --spec set_chan_info(gateway_type(), +-spec set_chan_info(gateway_name(), emqx_types:clientid(), emqx_types:infos()) -> boolean(). -set_chan_info(Type, ClientId, Infos) -> - set_chan_info(Type, ClientId, self(), Infos). +set_chan_info(GwName, ClientId, Infos) -> + set_chan_info(GwName, ClientId, self(), Infos). --spec set_chan_info(gateway_type(), +-spec set_chan_info(gateway_name(), emqx_types:clientid(), pid(), emqx_types:infos()) -> boolean(). -set_chan_info(Type, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> +set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try ets:update_element(tabname(info, Type), Chan, {2, Infos}) + try ets:update_element(tabname(info, GwName), Chan, {2, Infos}) catch error:badarg -> false end; -set_chan_info(Type, ClientId, ChanPid, Infos) -> - rpc_call(node(ChanPid), set_chan_info, [Type, ClientId, ChanPid, Infos]). +set_chan_info(GwName, ClientId, ChanPid, Infos) -> + rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]). %% @doc Get channel's stats. --spec get_chan_stats(gateway_type(), emqx_types:clientid()) +-spec get_chan_stats(gateway_name(), emqx_types:clientid()) -> emqx_types:stats() | undefined. -get_chan_stats(Type, ClientId) -> - with_channel(Type, ClientId, +get_chan_stats(GwName, ClientId) -> + with_channel(GwName, ClientId, fun(ChanPid) -> - get_chan_stats(Type, ClientId, ChanPid) + get_chan_stats(GwName, ClientId, ChanPid) end). --spec get_chan_stats(gateway_type(), emqx_types:clientid(), pid()) +-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:stats() | undefined. -get_chan_stats(Type, ClientId, ChanPid) when node(ChanPid) == node() -> +get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(tabname(info, Type), Chan, 3) + try ets:lookup_element(tabname(info, GwName), Chan, 3) catch error:badarg -> undefined end; -get_chan_stats(Type, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [Type, ClientId, ChanPid]). +get_chan_stats(GwName, ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]). --spec set_chan_stats(gateway_type(), +-spec set_chan_stats(gateway_name(), emqx_types:clientid(), emqx_types:stats()) -> boolean(). -set_chan_stats(Type, ClientId, Stats) -> - set_chan_stats(Type, ClientId, self(), Stats). +set_chan_stats(GwName, ClientId, Stats) -> + set_chan_stats(GwName, ClientId, self(), Stats). --spec set_chan_stats(gateway_type(), +-spec set_chan_stats(gateway_name(), emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(Type, ClientId, ChanPid, Stats) when node(ChanPid) == node() -> +set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() -> Chan = {ClientId, self()}, - try ets:update_element(tabname(info, Type), Chan, {3, Stats}) + try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) catch error:badarg -> false end; -set_chan_stats(Type, ClientId, ChanPid, Stats) -> - rpc_call(node(ChanPid), set_chan_stats, [Type, ClientId, ChanPid, Stats]). +set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]). --spec connection_closed(gateway_type(), emqx_types:clientid()) -> true. -connection_closed(Type, ClientId) -> +-spec connection_closed(gateway_name(), emqx_types:clientid()) -> true. +connection_closed(GwName, ClientId) -> %% XXX: Why we need to delete conn_mod tab ??? Chan = {ClientId, self()}, - ets:delete_object(tabname(conn, Type), Chan). + ets:delete_object(tabname(conn, GwName), Chan). --spec open_session(Type :: atom(), CleanStart :: boolean(), +-spec open_session(GwName :: gateway_name(), + CleanStart :: boolean(), ClientInfo :: emqx_types:clientinfo(), ConnInfo :: emqx_types:conninfo(), CreateSessionFun :: fun((emqx_types:clientinfo(), @@ -226,24 +231,24 @@ connection_closed(Type, ClientId) -> }} | {error, any()}. -open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> - open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). +open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> + open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). -open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> +open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> Self = self(), ClientId = maps:get(clientid, ClientInfo), Fun = fun(_) -> - ok = discard_session(Type, ClientId), - Session = create_session(Type, + ok = discard_session(GwName, ClientId), + Session = create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod ), - register_channel(Type, ClientId, Self, ConnInfo), + register_channel(GwName, ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end, - locker_trans(Type, ClientId, Fun); + locker_trans(GwName, ClientId, Fun); open_session(_Type, false = _CleanStart, _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) -> @@ -251,13 +256,13 @@ open_session(_Type, false = _CleanStart, {error, not_supported_now}. %% @private -create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> +create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> try Session = emqx_gateway_utils:apply( CreateSessionFun, [ClientInfo, ConnInfo] ), - ok = emqx_gateway_metrics:inc(Type, 'session.created'), + ok = emqx_gateway_metrics:inc(GwName, 'session.created'), SessionInfo = case is_tuple(Session) andalso element(1, Session) == session of true -> SessionMod:info(Session); @@ -279,17 +284,17 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> end. %% @doc Discard all the sessions identified by the ClientId. --spec discard_session(Type :: atom(), binary()) -> ok. -discard_session(Type, ClientId) when is_binary(ClientId) -> - case lookup_channels(Type, ClientId) of +-spec discard_session(GwName :: gateway_name(), binary()) -> ok. +discard_session(GwName, ClientId) when is_binary(ClientId) -> + case lookup_channels(GwName, ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(Type, ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids) end. %% @private -do_discard_session(Type, ClientId, Pid) -> +do_discard_session(GwName, ClientId, Pid) -> try - discard_session(Type, ClientId, Pid) + discard_session(GwName, ClientId, Pid) catch _ : noproc -> % emqx_ws_connection: call %?tp(debug, "session_already_gone", #{pid => Pid}), @@ -307,72 +312,72 @@ do_discard_session(Type, ClientId, Pid) -> end. %% @private -discard_session(Type, ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(Type, ClientId, ChanPid) of +discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(GwName, ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> ConnMod:call(ChanPid, discard, ?T_TAKEOVER) end; %% @private -discard_session(Type, ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [Type, ClientId, ChanPid]). +discard_session(GwName, ClientId, ChanPid) -> + rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]). --spec kick_session(gateway_type(), emqx_types:clientid()) +-spec kick_session(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. -kick_session(Type, ClientId) -> - case lookup_channels(Type, ClientId) of +kick_session(GwName, ClientId) -> + case lookup_channels(GwName, ClientId) of [] -> {error, not_found}; [ChanPid] -> - kick_session(Type, ClientId, ChanPid); + kick_session(GwName, ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), ?LOG(error, "More than one channel found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> - catch discard_session(Type, ClientId, StalePid) + catch discard_session(GwName, ClientId, StalePid) end, StalePids), - kick_session(Type, ClientId, ChanPid) + kick_session(GwName, ClientId, ChanPid) end. -kick_session(Type, ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(Type, ClientId, ChanPid) of +kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chan_info(GwName, ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} end; -kick_session(Type, ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [Type, ClientId, ChanPid]). +kick_session(GwName, ClientId, ChanPid) -> + rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]). -with_channel(Type, ClientId, Fun) -> - case lookup_channels(Type, ClientId) of +with_channel(GwName, ClientId, Fun) -> + case lookup_channels(GwName, ClientId) of [] -> undefined; [Pid] -> Fun(Pid); Pids -> Fun(lists:last(Pids)) end. %% @doc Lookup channels. --spec(lookup_channels(atom(), emqx_types:clientid()) -> list(pid())). -lookup_channels(Type, ClientId) -> - emqx_gateway_cm_registry:lookup_channels(Type, ClientId). +-spec(lookup_channels(gateway_name(), emqx_types:clientid()) -> list(pid())). +lookup_channels(GwName, ClientId) -> + emqx_gateway_cm_registry:lookup_channels(GwName, ClientId). -get_chann_conn_mod(Type, ClientId, ChanPid) when node(ChanPid) == node() -> +get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try [ConnMod] = ets:lookup_element(tabname(conn, Type), Chan, 2), ConnMod + try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod catch error:badarg -> undefined end; -get_chann_conn_mod(Type, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [Type, ClientId, ChanPid]). +get_chann_conn_mod(GwName, ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]). %% Locker locker_trans(_Type, undefined, Fun) -> Fun([]); -locker_trans(Type, ClientId, Fun) -> - Locker = lockername(Type), +locker_trans(GwName, ClientId, Fun) -> + Locker = lockername(GwName), case locker_lock(Locker, ClientId) of {true, Nodes} -> try Fun(Nodes) after locker_unlock(Locker, ClientId) end; @@ -401,27 +406,27 @@ cast(Name, Msg) -> %%-------------------------------------------------------------------- init(Options) -> - Type = proplists:get_value(type, Options), + GwName = proplists:get_value(gwname, Options), TabOpts = [public, {write_concurrency, true}], - {ChanTab, ConnTab, InfoTab} = cmtabs(Type), + {ChanTab, ConnTab, InfoTab} = cmtabs(GwName), ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(ConnTab, [bag | TabOpts]), ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]), %% Start link cm-registry process %% XXX: Should I hang it under a higher level supervisor? - {ok, Registry} = emqx_gateway_cm_registry:start_link(Type), + {ok, Registry} = emqx_gateway_cm_registry:start_link(GwName), %% Start locker process - {ok, Locker} = ekka_locker:start_link(lockername(Type)), + {ok, Locker} = ekka_locker:start_link(lockername(GwName)), %% Interval update stats %% TODO: v0.2 %ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), - {ok, #state{type = Type, + {ok, #state{gwname = GwName, locker = Locker, registry = Registry, chan_pmon = emqx_pmon:new()}}. @@ -438,12 +443,12 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state{type = Type, chan_pmon = PMon}) -> + State = #state{gwname = GwName, chan_pmon = PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - CmTabs = cmtabs(Type), - ok = emqx_pool:async_submit(fun do_unregister_channel_task/3, [Items, Type, CmTabs]), + CmTabs = cmtabs(GwName), + ok = emqx_pool:async_submit(fun do_unregister_channel_task/3, [Items, GwName, CmTabs]), {noreply, State#state{chan_pmon = PMon1}}; handle_info(_Info, State) -> @@ -455,18 +460,18 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -do_unregister_channel_task(Items, Type, CmTabs) -> +do_unregister_channel_task(Items, GwName, CmTabs) -> lists:foreach( fun({ChanPid, ClientId}) -> - do_unregister_channel(Type, {ClientId, ChanPid}, CmTabs) + do_unregister_channel(GwName, {ClientId, ChanPid}, CmTabs) end, Items). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -do_unregister_channel(Type, Chan, {ChanTab, ConnTab, InfoTab}) -> - ok = emqx_gateway_cm_registry:unregister_channel(Type, Chan), +do_unregister_channel(GwName, Chan, {ChanTab, ConnTab, InfoTab}) -> + ok = emqx_gateway_cm_registry:unregister_channel(GwName, Chan), true = ets:delete(ConnTab, Chan), true = ets:delete(InfoTab, Chan), ets:delete_object(ChanTab, Chan). diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index d9517b53f..22391d0b6 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -27,10 +27,8 @@ %% configuration, register devices and other common operations. %% -type context() :: - #{ %% Gateway Instance ID - instid := instance_id() - %% Gateway ID - , type := gateway_type() + #{ %% Gateway Name + gwname := gateway_name() %% Autenticator , auth := emqx_authn:chain_id() | undefined %% The ConnectionManager PID @@ -98,41 +96,43 @@ authenticate(_Ctx, ClientInfo) -> }} | {error, any()}. open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> - open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). + open_session(Ctx, CleanStart, ClientInfo, ConnInfo, + CreateSessionFun, emqx_session). open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> logger:warning("clean_start=false is not supported now, " "fallback to clean_start mode"), open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod); -open_session(_Ctx = #{type := Type}, +open_session(_Ctx = #{gwname := GwName}, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> - emqx_gateway_cm:open_session(Type, CleanStart, - ClientInfo, ConnInfo, CreateSessionFun, SessionMod). + emqx_gateway_cm:open_session(GwName, CleanStart, + ClientInfo, ConnInfo, + CreateSessionFun, SessionMod). -spec insert_channel_info(context(), emqx_types:clientid(), emqx_types:infos(), emqx_types:stats()) -> ok. -insert_channel_info(_Ctx = #{type := Type}, ClientId, Infos, Stats) -> - emqx_gateway_cm:insert_channel_info(Type, ClientId, Infos, Stats). +insert_channel_info(_Ctx = #{gwname := GwName}, ClientId, Infos, Stats) -> + emqx_gateway_cm:insert_channel_info(GwName, ClientId, Infos, Stats). %% @doc Set the Channel Info to the ConnectionManager for this client -spec set_chan_info(context(), emqx_types:clientid(), emqx_types:infos()) -> boolean(). -set_chan_info(_Ctx = #{type := Type}, ClientId, Infos) -> - emqx_gateway_cm:set_chan_info(Type, ClientId, Infos). +set_chan_info(_Ctx = #{gwname := GwName}, ClientId, Infos) -> + emqx_gateway_cm:set_chan_info(GwName, ClientId, Infos). -spec set_chan_stats(context(), emqx_types:clientid(), emqx_types:stats()) -> boolean(). -set_chan_stats(_Ctx = #{type := Type}, ClientId, Stats) -> - emqx_gateway_cm:set_chan_stats(Type, ClientId, Stats). +set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) -> + emqx_gateway_cm:set_chan_stats(GwName, ClientId, Stats). -spec connection_closed(context(), emqx_types:clientid()) -> boolean(). -connection_closed(_Ctx = #{type := Type}, ClientId) -> - emqx_gateway_cm:connection_closed(Type, ClientId). +connection_closed(_Ctx = #{gwname := GwName}, ClientId) -> + emqx_gateway_cm:connection_closed(GwName, ClientId). -spec authorize(context(), emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) @@ -140,11 +140,11 @@ connection_closed(_Ctx = #{type := Type}, ClientId) -> authorize(_Ctx, ClientInfo, PubSub, Topic) -> emqx_access_control:authorize(ClientInfo, PubSub, Topic). -metrics_inc(_Ctx = #{type := Type}, Name) -> - emqx_gateway_metrics:inc(Type, Name). +metrics_inc(_Ctx = #{gwname := GwName}, Name) -> + emqx_gateway_metrics:inc(GwName, Name). -metrics_inc(_Ctx = #{type := Type}, Name, Oct) -> - emqx_gateway_metrics:inc(Type, Name, Oct). +metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) -> + emqx_gateway_metrics:inc(GwName, Name, Oct). %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl index 37bfd10a3..bfde2b562 100644 --- a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl @@ -42,18 +42,18 @@ %% APIs %%-------------------------------------------------------------------- -start_link(Type) -> - supervisor:start_link({local, Type}, ?MODULE, [Type]). +start_link(GwName) -> + supervisor:start_link({local, GwName}, ?MODULE, [GwName]). -spec create_insta(pid(), gateway(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}. -create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) -> - case emqx_gateway_utils:find_sup_child(Sup, GwType) of +create_insta(Sup, Gateway = #{name := Name}, GwDscrptr) -> + case emqx_gateway_utils:find_sup_child(Sup, Name) of {ok, _GwInstaPid} -> {error, alredy_existed}; false -> - Ctx = ctx(Sup, GwType), + Ctx = ctx(Sup, Name), %% ChildSpec = emqx_gateway_utils:childspec( - GwType, + Name, worker, emqx_gateway_insta_sup, [Gateway, Ctx, GwDscrptr] @@ -63,34 +63,34 @@ create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) -> ) end. --spec remove_insta(pid(), GwType :: gateway_type()) -> ok | {error, any()}. -remove_insta(Sup, GwType) -> - case emqx_gateway_utils:find_sup_child(Sup, GwType) of +-spec remove_insta(pid(), Name :: gateway_name()) -> ok | {error, any()}. +remove_insta(Sup, Name) -> + case emqx_gateway_utils:find_sup_child(Sup, Name) of false -> ok; {ok, _GwInstaPid} -> - ok = supervisor:terminate_child(Sup, GwType), - ok = supervisor:delete_child(Sup, GwType) + ok = supervisor:terminate_child(Sup, Name), + ok = supervisor:delete_child(Sup, Name) end. -spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}. -update_insta(Sup, NewGateway = #{type := GwType}) -> - case emqx_gateway_utils:find_sup_child(Sup, GwType) of +update_insta(Sup, NewGateway = #{name := Name}) -> + case emqx_gateway_utils:find_sup_child(Sup, Name) of false -> {error, not_found}; {ok, GwInstaPid} -> emqx_gateway_insta_sup:update(GwInstaPid, NewGateway) end. --spec start_insta(pid(), gateway_type()) -> ok | {error, any()}. -start_insta(Sup, GwType) -> - case emqx_gateway_utils:find_sup_child(Sup, GwType) of +-spec start_insta(pid(), gateway_name()) -> ok | {error, any()}. +start_insta(Sup, Name) -> + case emqx_gateway_utils:find_sup_child(Sup, Name) of false -> {error, not_found}; {ok, GwInstaPid} -> emqx_gateway_insta_sup:enable(GwInstaPid) end. --spec stop_insta(pid(), gateway_type()) -> ok | {error, any()}. -stop_insta(Sup, GwType) -> - case emqx_gateway_utils:find_sup_child(Sup, GwType) of +-spec stop_insta(pid(), gateway_name()) -> ok | {error, any()}. +stop_insta(Sup, Name) -> + case emqx_gateway_utils:find_sup_child(Sup, Name) of false -> {error, not_found}; {ok, GwInstaPid} -> emqx_gateway_insta_sup:disable(GwInstaPid) @@ -99,33 +99,31 @@ stop_insta(Sup, GwType) -> -spec list_insta(pid()) -> [gateway()]. list_insta(Sup) -> lists:filtermap( - fun({GwType, GwInstaPid, _Type, _Mods}) -> - is_gateway_insta_id(GwType) + fun({Name, GwInstaPid, _Type, _Mods}) -> + is_gateway_insta_id(Name) andalso {true, emqx_gateway_insta_sup:info(GwInstaPid)} end, supervisor:which_children(Sup)). %% Supervisor callback %% @doc Initialize Top Supervisor for a Protocol -init([Type]) -> +init([GwName]) -> SupFlags = #{ strategy => one_for_one , intensity => 10 , period => 60 }, - CmOpts = [{type, Type}], + CmOpts = [{gwname, GwName}], CM = emqx_gateway_utils:childspec(worker, emqx_gateway_cm, [CmOpts]), - Metrics = emqx_gateway_utils:childspec(worker, emqx_gateway_metrics, [Type]), + Metrics = emqx_gateway_utils:childspec(worker, emqx_gateway_metrics, [GwName]), {ok, {SupFlags, [CM, Metrics]}}. %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -ctx(Sup, GwType) -> - {_, Type} = erlang:process_info(Sup, registered_name), +ctx(Sup, Name) -> {ok, CM} = emqx_gateway_utils:find_sup_child(Sup, emqx_gateway_cm), - #{ instid => GwType - , type => Type + #{ gwname => Name , cm => CM }. diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 0cdf15f9b..404766719 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -86,8 +86,8 @@ call(Pid, Req) -> init([Gateway, Ctx0, _GwDscrptr]) -> process_flag(trap_exit, true), - #{type := GwType, rawconf := RawConf} = Gateway, - Ctx = do_init_context(GwType, RawConf, Ctx0), + #{name := GwName, rawconf := RawConf} = Gateway, + Ctx = do_init_context(GwName, RawConf, Ctx0), State = #state{ gw = Gateway, ctx = Ctx, @@ -102,11 +102,11 @@ init([Gateway, Ctx0, _GwDscrptr]) -> {ok, NState} end. -do_init_context(GwType, RawConf, Ctx) -> +do_init_context(GwName, RawConf, Ctx) -> Auth = case maps:get(authentication, RawConf, #{enable => false}) of #{enable := true, authenticators := AuthCfgs} when is_list(AuthCfgs) -> - create_authenticators_for_gateway_insta(GwType, AuthCfgs); + create_authenticators_for_gateway_insta(GwName, AuthCfgs); _ -> undefined end, @@ -116,8 +116,8 @@ do_deinit_context(Ctx) -> cleanup_authenticators_for_gateway_insta(maps:get(auth, Ctx)), ok. -handle_call(info, _From, State = #state{gw = Gateway}) -> - {reply, Gateway, State}; +handle_call(info, _From, State = #state{gw = Gateway, status = Status}) -> + {reply, Gateway#{status => Status}, State}; handle_call(disable, _From, State = #state{status = Status}) -> case Status of @@ -146,21 +146,22 @@ handle_call(enable, _From, State = #state{status = Status}) -> end; %% Stopped -> update -handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, - status = stopped}) -> - case maps:get(type, NewGateway, undefined) - == maps:get(type, Gateway, undefined) of +handle_call({update, NewGateway}, _From, State = #state{ + gw = Gateway, + status = stopped}) -> + case maps:get(name, NewGateway, undefined) + == maps:get(name, Gateway, undefined) of true -> {reply, ok, State#state{gw = NewGateway}}; false -> - {reply, {error, gateway_type_not_match}, State} + {reply, {error, gateway_name_not_match}, State} end; %% Running -> update handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, status = running}) -> - case maps:get(type, NewGateway, undefined) - == maps:get(type, Gateway, undefined) of + case maps:get(name, NewGateway, undefined) + == maps:get(name, Gateway, undefined) of true -> case cb_gateway_update(NewGateway, State) of {ok, NState} -> @@ -169,7 +170,7 @@ handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, {reply, {error, Reason}, State} end; false -> - {reply, {error, gateway_type_not_match}, State} + {reply, {error, gateway_name_not_match}, State} end; handle_call(_Request, _From, State) -> @@ -215,8 +216,8 @@ code_change(_OldVsn, State, _Extra) -> %% @doc AuthCfgs is a array of authenticatior configurations, %% see: emqx_authn_schema:authenticators/1 -create_authenticators_for_gateway_insta(GwType, AuthCfgs) -> - ChainId = atom_to_binary(GwType, utf8), +create_authenticators_for_gateway_insta(GwName, AuthCfgs) -> + ChainId = atom_to_binary(GwName, utf8), case emqx_authn:create_chain(#{id => ChainId}) of {ok, _ChainInfo} -> Results = lists:map(fun(AuthCfg = #{name := Name}) -> @@ -250,10 +251,10 @@ cleanup_authenticators_for_gateway_insta(ChainId) -> "reason: ~p", [ChainId, Reason]) end. -cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType}, +cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName}, gw_state = GwState}) -> try - #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), CbMod:on_gateway_unload(Gateway, GwState), {ok, State#state{child_pids = [], gw_state = undefined, @@ -267,10 +268,10 @@ cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType}, {error, {Class, Reason, Stk}} end. -cb_gateway_load(State = #state{gw = Gateway = #{type := GwType}, +cb_gateway_load(State = #state{gw = Gateway = #{name := GwName}, ctx = Ctx}) -> try - #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_load(Gateway, Ctx) of {error, Reason} -> throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, GwState} -> @@ -285,17 +286,17 @@ cb_gateway_load(State = #state{gw = Gateway = #{type := GwType}, Class : Reason1 : Stk -> logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: " "{~p, ~p}, stacktrace: ~0p", - [GwType, Gateway, Ctx, + [GwName, Gateway, Ctx, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end. cb_gateway_update(NewGateway, - State = #state{gw = Gateway = #{type := GwType}, + State = #state{gw = Gateway = #{name := GwName}, ctx = Ctx, gw_state = GwState}) -> try - #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of {error, Reason} -> throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, NGwState} -> diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index d4e39443c..458017118 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -47,36 +47,36 @@ %% APIs %%-------------------------------------------------------------------- -start_link(Type) -> - gen_server:start_link(?MODULE, [Type], []). +start_link(GwName) -> + gen_server:start_link(?MODULE, [GwName], []). --spec inc(gateway_type(), atom()) -> ok. -inc(Type, Name) -> - inc(Type, Name, 1). +-spec inc(gateway_name(), atom()) -> ok. +inc(GwName, Name) -> + inc(GwName, Name, 1). --spec inc(gateway_type(), atom(), integer()) -> ok. -inc(Type, Name, Oct) -> - ets:update_counter(tabname(Type), Name, {2, Oct}, {Name, 0}), +-spec inc(gateway_name(), atom(), integer()) -> ok. +inc(GwName, Name, Oct) -> + ets:update_counter(tabname(GwName), Name, {2, Oct}, {Name, 0}), ok. --spec dec(gateway_type(), atom()) -> ok. -dec(Type, Name) -> - inc(Type, Name, -1). +-spec dec(gateway_name(), atom()) -> ok. +dec(GwName, Name) -> + inc(GwName, Name, -1). --spec dec(gateway_type(), atom(), non_neg_integer()) -> ok. -dec(Type, Name, Oct) -> - inc(Type, Name, -Oct). +-spec dec(gateway_name(), atom(), non_neg_integer()) -> ok. +dec(GwName, Name, Oct) -> + inc(GwName, Name, -Oct). -tabname(Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_metrics'])). +tabname(GwName) -> + list_to_atom(lists:concat([emqx_gateway_, GwName, '_metrics'])). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Type]) -> +init([GwName]) -> TabOpts = [public, {write_concurrency, true}], - ok = emqx_tables:new(tabname(Type), [set|TabOpts]), + ok = emqx_tables:new(tabname(GwName), [set|TabOpts]), {ok, #state{}}. handle_call(_Request, _From, State) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_registry.erl b/apps/emqx_gateway/src/emqx_gateway_registry.erl index 6eeb958ed..b311073a9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_registry.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The Registry Centre of Gateway Type +%% @doc The Registry Centre of Gateway -module(emqx_gateway_registry). -include("include/emqx_gateway.hrl"). @@ -42,7 +42,7 @@ ]). -record(state, { - reged = #{} :: #{ gateway_type() => descriptor() } + reged = #{} :: #{ gateway_name() => descriptor() } }). -type registry_options() :: [registry_option()]. @@ -64,33 +64,33 @@ start_link() -> %% Mgmt %%-------------------------------------------------------------------- --spec reg(gateway_type(), registry_options()) +-spec reg(gateway_name(), registry_options()) -> ok | {error, any()}. -reg(Type, RgOpts) -> - CbMod = proplists:get_value(cbkmod, RgOpts, Type), +reg(Name, RgOpts) -> + CbMod = proplists:get_value(cbkmod, RgOpts, Name), Dscrptr = #{ cbkmod => CbMod , rgopts => RgOpts }, - call({reg, Type, Dscrptr}). + call({reg, Name, Dscrptr}). --spec unreg(gateway_type()) -> ok | {error, any()}. -unreg(Type) -> +-spec unreg(gateway_name()) -> ok | {error, any()}. +unreg(Name) -> %% TODO: Checking ALL INSTACE HAS STOPPED - call({unreg, Type}). + call({unreg, Name}). %% TODO: -%unreg(Type, Force) -> -% call({unreg, Type, Froce}). +%unreg(Name, Force) -> +% call({unreg, Name, Froce}). %% @doc Return all registered protocol gateway implementation --spec list() -> [{gateway_type(), descriptor()}]. +-spec list() -> [{gateway_name(), descriptor()}]. list() -> call(all). --spec lookup(gateway_type()) -> undefined | descriptor(). -lookup(Type) -> - call({lookup, Type}). +-spec lookup(gateway_name()) -> undefined | descriptor(). +lookup(Name) -> + call({lookup, Name}). call(Req) -> gen_server:call(?MODULE, Req, 5000). @@ -104,29 +104,29 @@ init([]) -> process_flag(trap_exit, true), {ok, #state{reged = #{}}}. -handle_call({reg, Type, Dscrptr}, _From, State = #state{reged = Gateways}) -> - case maps:get(Type, Gateways, notfound) of +handle_call({reg, Name, Dscrptr}, _From, State = #state{reged = Gateways}) -> + case maps:get(Name, Gateways, notfound) of notfound -> - NGateways = maps:put(Type, Dscrptr, Gateways), + NGateways = maps:put(Name, Dscrptr, Gateways), {reply, ok, State#state{reged = NGateways}}; _ -> {reply, {error, already_existed}, State} end; -handle_call({unreg, Type}, _From, State = #state{reged = Gateways}) -> - case maps:get(Type, Gateways, undefined) of +handle_call({unreg, Name}, _From, State = #state{reged = Gateways}) -> + case maps:get(Name, Gateways, undefined) of undefined -> {reply, ok, State}; _ -> - _ = emqx_gateway_sup:unload_gateway(Type), - {reply, ok, State#state{reged = maps:remove(Type, Gateways)}} + _ = emqx_gateway_sup:unload_gateway(Name), + {reply, ok, State#state{reged = maps:remove(Name, Gateways)}} end; handle_call(all, _From, State = #state{reged = Gateways}) -> {reply, maps:to_list(Gateways), State}; -handle_call({lookup, Type}, _From, State = #state{reged = Gateways}) -> - Reply = maps:get(Type, Gateways, undefined), +handle_call({lookup, Name}, _From, State = #state{reged = Gateways}) -> + Reply = maps:get(Name, Gateways, undefined), {reply, Reply, State}; handle_call(Req, _From, State) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index b925d420e..87e41d93b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -44,27 +44,27 @@ start_link() -> -spec load_gateway(gateway()) -> {ok, pid()} | {error, any()}. -load_gateway(Gateway = #{type := GwType}) -> - case emqx_gateway_registry:lookup(GwType) of - undefined -> {error, {unknown_gateway_type, GwType}}; +load_gateway(Gateway = #{name := GwName}) -> + case emqx_gateway_registry:lookup(GwName) of + undefined -> {error, {unknown_gateway_name, GwName}}; GwDscrptr -> - {ok, GwSup} = ensure_gateway_suptree_ready(GwType), + {ok, GwSup} = ensure_gateway_suptree_ready(GwName), emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr) end. --spec unload_gateway(gateway_type()) -> ok | {error, not_found}. -unload_gateway(GwType) -> - case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of +-spec unload_gateway(gateway_name()) -> ok | {error, not_found}. +unload_gateway(GwName) -> + case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of false -> {error, not_found}; _ -> - _ = supervisor:terminate_child(?MODULE, GwType), - _ = supervisor:delete_child(?MODULE, GwType), + _ = supervisor:terminate_child(?MODULE, GwName), + _ = supervisor:delete_child(?MODULE, GwName), ok end. --spec lookup_gateway(gateway_type()) -> gateway() | undefined. -lookup_gateway(GwType) -> - case search_gateway_insta_proc(GwType) of +-spec lookup_gateway(gateway_name()) -> gateway() | undefined. +lookup_gateway(GwName) -> + case search_gateway_insta_proc(GwName) of {ok, {_, GwInstaPid}} -> emqx_gateway_insta_sup:info(GwInstaPid); _ -> @@ -74,25 +74,25 @@ lookup_gateway(GwType) -> -spec update_gateway(gateway()) -> ok | {error, any()}. -update_gateway(NewGateway = #{type := GwType}) -> - case emqx_gateway_utils:find_sup_child(?MODULE, GwType) of +update_gateway(NewGateway = #{name := GwName}) -> + case emqx_gateway_utils:find_sup_child(?MODULE, GwName) of {ok, GwSup} -> emqx_gateway_gw_sup:update_insta(GwSup, NewGateway); _ -> {error, not_found} end. -start_gateway_insta(GwType) -> - case search_gateway_insta_proc(GwType) of +start_gateway_insta(GwName) -> + case search_gateway_insta_proc(GwName) of {ok, {GwSup, _}} -> - emqx_gateway_gw_sup:start_insta(GwSup, GwType); + emqx_gateway_gw_sup:start_insta(GwSup, GwName); _ -> {error, not_found} end. --spec stop_gateway_insta(gateway_type()) -> ok | {error, any()}. -stop_gateway_insta(GwType) -> - case search_gateway_insta_proc(GwType) of +-spec stop_gateway_insta(gateway_name()) -> ok | {error, any()}. +stop_gateway_insta(GwName) -> + case search_gateway_insta_proc(GwName) of {ok, {GwSup, _}} -> - emqx_gateway_gw_sup:stop_insta(GwSup, GwType); + emqx_gateway_gw_sup:stop_insta(GwSup, GwName); _ -> {error, not_found} end. @@ -103,9 +103,9 @@ list_gateway_insta() -> emqx_gateway_gw_sup:list_insta(SupId) end, list_started_gateway())). --spec list_started_gateway() -> [gateway_type()]. +-spec list_started_gateway() -> [gateway_name()]. list_started_gateway() -> - started_gateway_type(). + started_gateway(). %% Supervisor callback @@ -122,14 +122,14 @@ init([]) -> %% Internal funcs %%-------------------------------------------------------------------- -ensure_gateway_suptree_ready(GwType) -> - case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of +ensure_gateway_suptree_ready(GwName) -> + case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of false -> ChildSpec = emqx_gateway_utils:childspec( - GwType, + GwName, supervisor, emqx_gateway_gw_sup, - [GwType] + [GwName] ), emqx_gateway_utils:supervisor_ret( supervisor:start_child(?MODULE, ChildSpec) @@ -150,7 +150,7 @@ search_gateway_insta_proc(InstaId, [SupPid|More]) -> search_gateway_insta_proc(InstaId, More) end. -started_gateway_type() -> +started_gateway() -> lists:filtermap( fun({Id, _, _, _}) -> is_a_gateway_id(Id) andalso {true, Id} diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 4fb5cd05a..8c500be82 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -47,9 +47,9 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -start_grpc_server(_GwType, undefined) -> +start_grpc_server(_GwName, undefined) -> undefined; -start_grpc_server(GwType, Options = #{bind := ListenOn}) -> +start_grpc_server(GwName, Options = #{bind := ListenOn}) -> Services = #{protos => [emqx_exproto_pb], services => #{ 'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr} @@ -59,12 +59,12 @@ start_grpc_server(GwType, Options = #{bind := ListenOn}) -> SslOpts -> [{ssl_options, SslOpts}] end, - _ = grpc:start_server(GwType, ListenOn, Services, SvrOptions), - ?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]). + _ = grpc:start_server(GwName, ListenOn, Services, SvrOptions), + ?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwName, ListenOn]). start_grpc_client_channel(_GwType, undefined) -> undefined; -start_grpc_client_channel(GwType, Options = #{address := UriStr}) -> +start_grpc_client_channel(GwName, Options = #{address := UriStr}) -> UriMap = uri_string:parse(UriStr), Scheme = maps:get(scheme, UriMap), Host = maps:get(host, UriMap), @@ -81,36 +81,36 @@ start_grpc_client_channel(GwType, Options = #{address := UriStr}) -> transport_opts => SslOpts}}; _ -> #{} end, - grpc_client_sup:create_channel_pool(GwType, SvrAddr, ClientOpts). + grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts). -on_gateway_load(_Gateway = #{ type := GwType, +on_gateway_load(_Gateway = #{ name := GwName, rawconf := RawConf }, Ctx) -> %% XXX: How to monitor it ? %% Start grpc client pool & client channel - PoolName = pool_name(GwType), + PoolName = pool_name(GwName), PoolSize = emqx_vm:schedulers() * 2, {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, {emqx_exproto_gcli, start_link, []}), - _ = start_grpc_client_channel(GwType, maps:get(handler, RawConf, undefined)), + _ = start_grpc_client_channel(GwName, maps:get(handler, RawConf, undefined)), %% XXX: How to monitor it ? - _ = start_grpc_server(GwType, maps:get(server, RawConf, undefined)), + _ = start_grpc_server(GwName, maps:get(server, RawConf, undefined)), NRawConf = maps:without( [server, handler], RawConf#{pool_name => PoolName} ), Listeners = emqx_gateway_utils:normalize_rawconf( - NRawConf#{handler => GwType} + NRawConf#{handler => GwName} ), ListenerPids = lists:map(fun(Lis) -> - start_listener(GwType, Ctx, Lis) + start_listener(GwName, Ctx, Lis) end, Listeners), {ok, ListenerPids, _GwState = #{ctx => Ctx}}. on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwType = maps:get(type, NewGateway), + GwName = maps:get(name, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? @@ -120,40 +120,40 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> Class : Reason : Stk -> logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [GwType, Class, Reason, Stk]), + [GwName, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_gateway_unload(_Gateway = #{ type := GwType, +on_gateway_unload(_Gateway = #{ name := GwName, rawconf := RawConf }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(GwType, Lis) + stop_listener(GwName, Lis) end, Listeners). -pool_name(GwType) -> - list_to_atom(lists:concat([GwType, "_gcli_pool"])). +pool_name(GwName) -> + list_to_atom(lists:concat([GwName, "_gcli_pool"])). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]), + [GwName, Type, ListenOnStr]), Pid; {error, Reason} -> ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]), + [GwName, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwType, Type), +start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_exproto_frame, @@ -172,8 +172,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> esockd:open_dtls(Name, ListenOn, Opts, MFA). -name(GwType, Type) -> - list_to_atom(lists:concat([GwType, ":", Type])). +name(GwName, Type) -> + list_to_atom(lists:concat([GwName, ":", Type])). merge_default_by_type(Type, Options) when Type =:= tcp; Type =:= ssl -> @@ -196,18 +196,18 @@ merge_default_by_type(Type, Options) when Type =:= udp; [{udp_options, Default} | Options] end. -stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]); + [GwName, Type, ListenOnStr]); {error, Reason} -> ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]) + [GwName, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwType, Type), +stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index cf1a1a017..d5fb0d06e 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -47,7 +47,7 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_gateway_load(_Gateway = #{ type := GwType, +on_gateway_load(_Gateway = #{ name := GwName, rawconf := RawConf }, Ctx) -> @@ -66,12 +66,12 @@ on_gateway_load(_Gateway = #{ type := GwType, Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(GwType, Ctx, Lis) + start_listener(GwName, Ctx, Lis) end, Listeners), {ok, ListenerPids, _GwState = #{ctx => Ctx}}. on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwType = maps:get(type, NewGateway), + GwName = maps:get(name, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? @@ -81,11 +81,11 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> Class : Reason : Stk -> logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [GwType, Class, Reason, Stk]), + [GwName, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_gateway_unload(_Gateway = #{ type := GwType, +on_gateway_unload(_Gateway = #{ name := GwName, rawconf := RawConf }, _GwState) -> %% XXX: @@ -96,28 +96,28 @@ on_gateway_unload(_Gateway = #{ type := GwType, Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(GwType, Lis) + stop_listener(GwName, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]), + [GwName, Type, ListenOnStr]), Pid; {error, Reason} -> ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]), + [GwName, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwType, udp), +start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, udp), NCfg = Cfg#{ctx => Ctx}, NSocketOpts = merge_default(SocketOpts), Options = [{config, NCfg}|NSocketOpts], @@ -128,8 +128,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> lwm2m_coap_server:start_dtls(Name, ListenOn, Options) end. -name(GwType, Type) -> - list_to_atom(lists:concat([GwType, ":", Type])). +name(GwName, Type) -> + list_to_atom(lists:concat([GwName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -141,20 +141,20 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]); + [GwName, Type, ListenOnStr]); {error, Reason} -> ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]) + [GwName, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwType, Type), +stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, Type), case Type of udp -> lwm2m_coap_server:stop_udp(Name, ListenOn); diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 3dfb8546d..d35228e1f 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -47,7 +47,7 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_gateway_load(_Gateway = #{ type := GwType, +on_gateway_load(_Gateway = #{ name := GwName, rawconf := RawConf }, Ctx) -> @@ -64,7 +64,7 @@ on_gateway_load(_Gateway = #{ type := GwType, end, PredefTopics = maps:get(predefined, RawConf, []), - {ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics), + {ok, RegistrySvr} = emqx_sn_registry:start_link(GwName, PredefTopics), NRawConf = maps:without( [broadcast, predefined], @@ -73,11 +73,11 @@ on_gateway_load(_Gateway = #{ type := GwType, Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(GwType, Ctx, Lis) + start_listener(GwName, Ctx, Lis) end, Listeners), {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. -on_gateway_update(NewGateway = #{type := GwType}, OldGateway, +on_gateway_update(NewGateway = #{name := GwName}, OldGateway, GwState = #{ctx := Ctx}) -> try %% XXX: 1. How hot-upgrade the changes ??? @@ -88,37 +88,37 @@ on_gateway_update(NewGateway = #{type := GwType}, OldGateway, Class : Reason : Stk -> logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [GwType, Class, Reason, Stk]), + [GwName, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_gateway_unload(_Insta = #{ type := GwType, +on_gateway_unload(_Insta = #{ name := GwName, rawconf := RawConf }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(GwType, Lis) + stop_listener(GwName, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]), + [GwName, Type, ListenOnStr]), Pid; {error, Reason} -> ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]), + [GwName, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwType, Type), +start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_sn_frame, @@ -127,8 +127,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwType, Type) -> - list_to_atom(lists:concat([GwType, ":", Type])). +name(GwName, Type) -> + list_to_atom(lists:concat([GwName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -140,18 +140,18 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]); + [GwName, Type, ListenOnStr]); {error, Reason} -> ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]) + [GwName, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwType, Type), +stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 84b10e97a..19bfc16ab 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -49,21 +49,21 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_gateway_load(_Gateway = #{ type := GwType, +on_gateway_load(_Gateway = #{ name := GwName, rawconf := RawConf }, Ctx) -> %% Step1. Fold the rawconfs to listeners Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), %% Step2. Start listeners or escokd:specs ListenerPids = lists:map(fun(Lis) -> - start_listener(GwType, Ctx, Lis) + start_listener(GwName, Ctx, Lis) end, Listeners), %% FIXME: How to throw an exception to interrupt the restart logic ? %% FIXME: Assign ctx to GwState {ok, ListenerPids, _GwState = #{ctx => Ctx}}. on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwType = maps:get(type, NewGateway), + GwName = maps:get(name, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old state??? @@ -73,37 +73,37 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> Class : Reason : Stk -> logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [GwType, Class, Reason, Stk]), + [GwName, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_gateway_unload(_Gateway = #{ type := GwType, +on_gateway_unload(_Gateway = #{ name := GwName, rawconf := RawConf }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(GwType, Lis) + stop_listener(GwName, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]), + [GwName, Type, ListenOnStr]), Pid; {error, Reason} -> ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]), + [GwName, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwType, Type), +start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_stomp_frame, @@ -112,8 +112,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwType, Type) -> - list_to_atom(lists:concat([GwType, ":", Type])). +name(GwName, Type) -> + list_to_atom(lists:concat([GwName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_tcp_options(), @@ -125,18 +125,18 @@ merge_default(Options) -> [{tcp_options, Default} | Options] end. -stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwType, Type, ListenOnStr]); + [GwName, Type, ListenOnStr]); {error, Reason} -> ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwType, Type, ListenOnStr, Reason]) + [GwName, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwType, Type), +stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, Type), esockd:close(Name, ListenOn).