emqx/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_broadcast.erl

137 lines
3.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqttsn_broadcast).
-behaviour(gen_server).
-include("emqx_mqttsn.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
start_link/2,
stop/0
]).
%% gen_server
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(state, {gwid, sock, port, addrs, duration, tref}).
-define(DEFAULT_DURATION, 15 * 60 * 1000).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec start_link(pos_integer(), inet:port_number()) ->
{ok, pid()} | {error, term()}.
start_link(GwId, Port) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [GwId, Port], []).
-spec stop() -> ok.
stop() ->
gen_server:stop(?MODULE, nomal, infinity).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([GwId, Port]) ->
%% FIXME:
Duration = application:get_env(emqx_mqttsn, advertise_duration, ?DEFAULT_DURATION),
{ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]),
{ok,
ensure_advertise(#state{
gwid = GwId,
addrs = boradcast_addrs(),
sock = Sock,
port = Port,
duration = Duration
})}.
handle_call(Req, _From, State) ->
?SLOG(error, #{
msg => "unexpected_call",
call => Req
}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{
msg => "unexpected_cast",
cast => Msg
}),
{noreply, State}.
handle_info(broadcast_advertise, State) ->
{noreply, ensure_advertise(State), hibernate};
handle_info(Info, State) ->
?SLOG(error, #{
msg => "unexpected_info",
info => Info
}),
{noreply, State}.
terminate(_Reason, #state{tref = Timer}) ->
_ = erlang:cancel_timer(Timer),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
ensure_advertise(State = #state{duration = Duration}) ->
send_advertise(State),
State#state{tref = erlang:send_after(Duration, self(), broadcast_advertise)}.
send_advertise(#state{
gwid = GwId,
sock = Sock,
port = Port,
addrs = Addrs,
duration = Duration
}) ->
Data = emqx_mqttsn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
lists:foreach(
fun(Addr) ->
?SLOG(debug, #{
msg => "send_ADVERTISE_msg",
address => Addr
}),
gen_udp:send(Sock, Addr, Port, Data)
end,
Addrs
).
boradcast_addrs() ->
lists:usort([
Addr
|| {ok, IfList} <- [inet:getiflist()],
If <- IfList,
{ok, [{broadaddr, Addr}]} <- [inet:ifget(If, [broadaddr])]
]).