173 lines
5.8 KiB
Erlang
173 lines
5.8 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(test_mqtt_broker).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-define(LOGT(Format, Args), logger:debug("TEST_BROKER: " ++ Format, Args)).
|
|
|
|
-record(state, {subscriber}).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_router.hrl").
|
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
start(_, <<"attacker">>, _, _, _) ->
|
|
{stop, auth_failure};
|
|
start(ClientId, Username, Password, _Channel, KeepaliveInterval) ->
|
|
true = is_binary(ClientId),
|
|
(true = (is_binary(Username)) orelse (Username == undefined)),
|
|
(true = (is_binary(Password)) orelse (Password == undefined)),
|
|
self() ! {keepalive, start, KeepaliveInterval},
|
|
{ok, []}.
|
|
|
|
publish(Topic, Payload, Qos) ->
|
|
ClientId = <<"lwm2m_test_suite">>,
|
|
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
|
|
emqx:publish(Msg).
|
|
|
|
subscribe(Topic) ->
|
|
gen_server:call(?MODULE, {subscribe, Topic, self()}).
|
|
|
|
unsubscribe(Topic) ->
|
|
gen_server:call(?MODULE, {unsubscribe, Topic}).
|
|
|
|
get_subscrbied_topics() ->
|
|
[Topic || {_Client, Topic} <- ets:tab2list(?SUBSCRIPTION)].
|
|
|
|
start_link() ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
stop() ->
|
|
gen_server:stop(?MODULE).
|
|
|
|
init(_Param) ->
|
|
{ok, #state{subscriber = []}}.
|
|
|
|
handle_call({subscribe, Topic, Proc}, _From, State = #state{subscriber = SubList}) ->
|
|
?LOGT("test broker subscribe Topic=~p, Pid=~p~n", [Topic, Proc]),
|
|
is_binary(Topic) orelse error("Topic should be a binary"),
|
|
{reply, {ok, []}, State#state{subscriber = [{Topic, Proc} | SubList]}};
|
|
handle_call(get_subscribed_topics, _From, State = #state{subscriber = SubList}) ->
|
|
Response = subscribed_topics(SubList, []),
|
|
?LOGT("test broker get subscribed topics=~p~n", [Response]),
|
|
{reply, Response, State};
|
|
handle_call({unsubscribe, Topic}, _From, State = #state{subscriber = SubList}) ->
|
|
?LOGT("test broker unsubscribe Topic=~p~n", [Topic]),
|
|
is_binary(Topic) orelse error("Topic should be a binary"),
|
|
NewSubList = proplists:delete(Topic, SubList),
|
|
{reply, {ok, []}, State#state{subscriber = NewSubList}};
|
|
handle_call(
|
|
{publish, {Topic, Msg, MatchedTopicFilter}}, _From, State = #state{subscriber = SubList}
|
|
) ->
|
|
(is_binary(Topic) and is_binary(Msg)) orelse error("Topic and Msg should be binary"),
|
|
Pid = proplists:get_value(MatchedTopicFilter, SubList),
|
|
?LOGT("test broker publish topic=~p, Msg=~p, Pid=~p, MatchedTopicFilter=~p, SubList=~p~n", [
|
|
Topic, Msg, Pid, MatchedTopicFilter, SubList
|
|
]),
|
|
(Pid == undefined) andalso
|
|
?LOGT(
|
|
"!!!!! this topic ~p has never been subscribed, please specify a valid topic filter", [
|
|
MatchedTopicFilter
|
|
]
|
|
),
|
|
?assertNotEqual(undefined, Pid),
|
|
Pid ! {deliver, #message{topic = Topic, payload = Msg}},
|
|
{reply, ok, State};
|
|
handle_call(stop, _From, State) ->
|
|
{stop, normal, stopped, State};
|
|
handle_call(Req, _From, State) ->
|
|
?LOGT("test_broker_server: ignore call Req=~p~n", [Req]),
|
|
{reply, {error, badreq}, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOGT("test_broker_server: ignore cast msg=~p~n", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info(Info, State) ->
|
|
?LOGT("test_broker_server: ignore info=~p~n", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(Reason, _State) ->
|
|
?LOGT("test_broker_server: terminate Reason=~p~n", [Reason]),
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
subscribed_topics([], Acc) ->
|
|
Acc;
|
|
subscribed_topics([{Topic, _Pid} | T], Acc) ->
|
|
subscribed_topics(T, [Topic | Acc]).
|
|
|
|
-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
|
|
|
|
-type keepalive() :: #keepalive{}.
|
|
|
|
%% @doc Start a keepalive
|
|
-spec start(fun(), integer(), any()) -> undefined | keepalive().
|
|
start(_, 0, _) ->
|
|
undefined;
|
|
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
|
{ok, StatVal} = StatFun(),
|
|
#keepalive{
|
|
statfun = StatFun,
|
|
statval = StatVal,
|
|
tsec = TimeoutSec,
|
|
tmsg = TimeoutMsg,
|
|
tref = timer(TimeoutSec, TimeoutMsg)
|
|
}.
|
|
|
|
%% @doc Check keepalive, called when timeout.
|
|
-spec check(keepalive()) -> {ok, keepalive()} | {error, any()}.
|
|
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
|
case StatFun() of
|
|
{ok, NewVal} ->
|
|
if
|
|
NewVal =/= LastVal ->
|
|
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
|
|
Repeat < 1 ->
|
|
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
|
|
true ->
|
|
{error, timeout}
|
|
end;
|
|
{error, Error} ->
|
|
{error, Error}
|
|
end.
|
|
|
|
resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
|
|
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
|
|
|
|
%% @doc Cancel Keepalive
|
|
-spec cancel(keepalive()) -> ok.
|
|
cancel(#keepalive{tref = TRef}) ->
|
|
cancel(TRef);
|
|
cancel(undefined) ->
|
|
ok;
|
|
cancel(TRef) ->
|
|
catch erlang:cancel_timer(TRef).
|
|
|
|
timer(Sec, Msg) ->
|
|
erlang:send_after(timer:seconds(Sec), self(), Msg).
|
|
|
|
log(Format, Args) ->
|
|
logger:debug(Format, Args).
|