emqx/apps/emqx_resource/test/emqx_connector_demo.erl

240 lines
7.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_demo).
-include_lib("typerefl/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2
]).
-export([counter_loop/0, set_callback_mode/1]).
%% callbacks for emqx_resource config schema
-export([roots/0]).
-define(CM_KEY, {?MODULE, callback_mode}).
roots() ->
[
{name, fun name/1},
{register, fun register/1}
].
name(type) -> atom();
name(required) -> true;
name(_) -> undefined.
register(type) -> boolean();
register(required) -> true;
register(default) -> false;
register(_) -> undefined.
callback_mode() ->
persistent_term:get(?CM_KEY).
set_callback_mode(Mode) ->
persistent_term:put(?CM_KEY, Mode).
on_start(_InstId, #{create_error := true}) ->
error("some error");
on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false),
StopError = maps:get(stop_error, Opts, false),
{ok, Opts#{
id => InstId,
stop_error => StopError,
pid => spawn_counter_process(Name, Register)
}}.
on_stop(_InstId, #{stop_error := true}) ->
{error, stop_error};
on_stop(_InstId, #{pid := Pid}) ->
erlang:exit(Pid, shutdown),
ok.
on_query(_InstId, get_state, State) ->
{ok, State};
on_query(_InstId, get_state_failed, State) ->
{error, State};
on_query(_InstId, block, #{pid := Pid}) ->
Pid ! block,
ok;
on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume,
ok;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {inc, N}},
receive
{ReqRef, ok} ->
?tp(connector_demo_inc_counter, #{n => N}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, get_incorrect_status_count},
receive
{ReqRef, Count} -> {ok, Count}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, get_counter, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, get},
receive
{ReqRef, Num} -> {ok, Num}
after 1000 ->
{error, timeout}
end.
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Pid ! {inc, N, ReplyFun},
ok;
on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
Pid ! {get, ReplyFun},
ok.
on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but
%% cannot be mixed.
case hd(BatchReq) of
{inc_counter, _} ->
batch_inc_counter(sync, InstId, BatchReq, State);
get_counter ->
batch_get_counter(sync, InstId, State)
end.
on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but
%% cannot be mixed.
case hd(BatchReq) of
{inc_counter, _} ->
batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
get_counter ->
batch_get_counter({async, ReplyFunAndArgs}, InstId, State)
end.
batch_inc_counter(CallMode, InstId, BatchReq, State) ->
TotalN = lists:foldl(
fun
({inc_counter, N}, Total) ->
?tp(connector_demo_batch_inc_individual, #{n => N}),
Total + N;
(Req, _Total) ->
error({mixed_requests_not_allowed, {inc_counter, Req}})
end,
0,
BatchReq
),
case CallMode of
sync ->
on_query(InstId, {inc_counter, TotalN}, State);
{async, ReplyFunAndArgs} ->
on_query_async(InstId, {inc_counter, TotalN}, ReplyFunAndArgs, State)
end.
batch_get_counter(sync, InstId, State) ->
on_query(InstId, get_counter, State);
batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
on_get_status(_InstId, #{health_check_error := true}) ->
disconnected;
on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300),
case is_process_alive(Pid) of
true -> connected;
false -> disconnected
end.
spawn_counter_process(Name, Register) ->
Pid = spawn_link(?MODULE, counter_loop, []),
true = maybe_register(Name, Pid, Register),
Pid.
counter_loop() ->
counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}).
counter_loop(
#{
counter := Num,
status := Status,
incorrect_status_count := IncorrectCount
} = State
) ->
NewState =
receive
block ->
ct:pal("counter recv: ~p", [block]),
State#{status => blocked};
resume ->
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
State#{status => running};
{inc, N, ReplyFun} when Status == running ->
%ct:pal("async counter recv: ~p", [{inc, N}]),
apply_reply(ReplyFun, ok),
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok},
State#{counter => Num + N};
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;
{{FromPid, ReqRef}, get_incorrect_status_count} ->
FromPid ! {ReqRef, IncorrectCount},
State;
{{FromPid, ReqRef}, get} ->
FromPid ! {ReqRef, Num},
State
end,
counter_loop(NewState).
maybe_register(Name, Pid, true) ->
ct:pal("---- Register Name: ~p", [Name]),
ct:pal("---- whereis(): ~p", [whereis(Name)]),
erlang:register(Name, Pid);
maybe_register(_Name, _Pid, false) ->
true.
apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
apply(ReplyFun, Args ++ [Result]).