emqx/apps/emqx_resource/test/emqx_connector_demo.erl

450 lines
15 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_demo).
-include_lib("typerefl/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2
]).
-export([counter_loop/0, set_callback_mode/1]).
%% callbacks for emqx_resource config schema
-export([roots/0]).
-define(CM_KEY, {?MODULE, callback_mode}).
roots() ->
[
{name, fun name/1},
{register, fun register/1}
].
name(type) -> atom();
name(required) -> true;
name(_) -> undefined.
register(type) -> boolean();
register(required) -> true;
register(default) -> false;
register(_) -> undefined.
callback_mode() ->
persistent_term:get(?CM_KEY).
set_callback_mode(Mode) ->
persistent_term:put(?CM_KEY, Mode).
on_start(_InstId, #{create_error := true}) ->
?tp(connector_demo_start_error, #{}),
error("some error");
on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false),
StopError = maps:get(stop_error, Opts, false),
{ok, Opts#{
id => InstId,
stop_error => StopError,
pid => spawn_counter_process(Name, Register)
}}.
on_stop(_InstId, #{stop_error := true}) ->
{error, stop_error};
on_stop(_InstId, #{pid := Pid}) ->
stop_counter_process(Pid).
on_query(_InstId, get_state, State) ->
{ok, State};
on_query(_InstId, get_state_failed, State) ->
{error, State};
on_query(_InstId, block, #{pid := Pid}) ->
Pid ! block,
ok;
on_query(_InstId, block_now, #{pid := Pid}) ->
Pid ! block,
{error, {resource_error, #{reason => blocked, msg => blocked}}};
on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume,
ok;
on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {big_payload, Payload}},
receive
{ReqRef, ok} ->
?tp(connector_demo_big_payload, #{payload => Payload}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {inc, N}},
receive
{ReqRef, ok} ->
?tp(connector_demo_inc_counter, #{n => N}),
ok;
{ReqRef, incorrect_status} ->
{error, {recoverable_error, incorrect_status}}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, get_incorrect_status_count},
receive
{ReqRef, Count} -> {ok, Count}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, get_counter, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, get},
receive
{ReqRef, Num} -> {ok, Num}
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {individual_reply, IsSuccess}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {individual_reply, IsSuccess}},
receive
{ReqRef, Res} -> Res
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => sync, for => For}),
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {sleep_before_reply, For}},
receive
{ReqRef, Result} ->
Result
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
%% This simulates a slow sync call
timer:sleep(SleepFor),
{ok, slept}.
on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
Pid ! {block, ReplyFun},
{ok, Pid};
on_query_async(_InstId, resume, ReplyFun, #{pid := Pid}) ->
Pid ! {resume, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Pid ! {inc, N, ReplyFun},
{ok, Pid};
on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
Pid ! {get, ReplyFun},
{ok, Pid};
on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
Pid ! {block_now, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
Pid ! {big_payload, Payload, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {individual_reply, IsSuccess}, ReplyFun, #{pid := Pid}) ->
Pid ! {individual_reply, IsSuccess, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => async, for => For}),
Pid ! {{sleep_before_reply, For}, ReplyFun},
{ok, Pid}.
on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but
%% cannot be mixed.
case hd(BatchReq) of
{inc_counter, _} ->
batch_inc_counter(sync, InstId, BatchReq, State);
get_counter ->
batch_get_counter(sync, InstId, State);
{big_payload, _Payload} ->
batch_big_payload(sync, InstId, BatchReq, State);
{individual_reply, _IsSuccess} ->
batch_individual_reply(sync, InstId, BatchReq, State);
{random_reply, Num} ->
%% async batch retried
make_random_reply(Num)
end.
on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) ->
%% Requests can be of multiple types, but cannot be mixed.
case hd(BatchReq) of
{inc_counter, _} ->
batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
get_counter ->
batch_get_counter({async, ReplyFunAndArgs}, InstId, State);
block_now ->
on_query_async(InstId, block_now, ReplyFunAndArgs, State);
{big_payload, _Payload} ->
batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State);
{individual_reply, _IsSuccess} ->
batch_individual_reply({async, ReplyFunAndArgs}, InstId, BatchReq, State);
{random_reply, Num} ->
%% only take the first Num in the batch should be random enough
Pid ! {{random_reply, Num}, ReplyFunAndArgs},
{ok, Pid}
end.
batch_inc_counter(CallMode, InstId, BatchReq, State) ->
TotalN = lists:foldl(
fun
({inc_counter, N}, Total) ->
?tp(connector_demo_batch_inc_individual, #{n => N}),
Total + N;
(Req, _Total) ->
error({mixed_requests_not_allowed, {inc_counter, Req}})
end,
0,
BatchReq
),
case CallMode of
sync ->
on_query(InstId, {inc_counter, TotalN}, State);
{async, ReplyFunAndArgs} ->
on_query_async(InstId, {inc_counter, TotalN}, ReplyFunAndArgs, State)
end.
batch_get_counter(sync, InstId, State) ->
on_query(InstId, get_counter, State);
batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
batch_big_payload(sync, InstId, Batch, State) ->
[Res | _] = lists:map(
fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end,
Batch
),
Res;
batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) ->
lists:foreach(
fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end,
Batch
),
{ok, Pid}.
batch_individual_reply(sync, InstId, Batch, State) ->
lists:map(
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
Batch
);
batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) ->
Pid = spawn(fun() ->
Results = lists:map(
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
Batch
),
apply_reply(ReplyFunAndArgs, Results)
end),
{ok, Pid}.
on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}),
disconnected;
on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
?tp(connector_demo_health_check_error, #{}),
{disconnected, State, Message};
on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300),
case is_process_alive(Pid) of
true -> connected;
false -> disconnected
end.
spawn_counter_process(Name, Register) ->
Pid = spawn_link(?MODULE, counter_loop, []),
true = maybe_register(Name, Pid, Register),
Pid.
stop_counter_process(Pid) ->
true = erlang:is_process_alive(Pid),
true = erlang:exit(Pid, shutdown),
receive
{'EXIT', Pid, shutdown} -> ok
after 5000 ->
{error, timeout}
end.
counter_loop() ->
counter_loop(#{
counter => 0,
status => running,
incorrect_status_count => 0
}).
counter_loop(
#{
counter := Num,
status := Status,
incorrect_status_count := IncorrectCount
} = State
) ->
NewState =
receive
block ->
ct:pal("counter recv: ~p", [block]),
State#{status => blocked};
{block, ReplyFun} ->
ct:pal("counter recv: ~p", [block]),
apply_reply(ReplyFun, ok),
State#{status => blocked};
{block_now, ReplyFun} ->
ct:pal("counter recv: ~p", [block_now]),
apply_reply(
ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}}
),
State#{status => blocked};
resume ->
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
State#{status => running};
{resume, ReplyFun} ->
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
apply_reply(ReplyFun, ok),
State#{status => running};
{inc, N, ReplyFun} when Status == running ->
%ct:pal("async counter recv: ~p", [{inc, N}]),
apply_reply(ReplyFun, ok),
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
{big_payload, _Payload, ReplyFun} when Status == blocked ->
apply_reply(ReplyFun, {error, {recoverable_error, blocked}}),
State;
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok},
State#{counter => Num + N};
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running ->
FromPid ! {ReqRef, ok},
State;
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;
{{FromPid, ReqRef}, get_incorrect_status_count} ->
FromPid ! {ReqRef, IncorrectCount},
State;
{{FromPid, ReqRef}, get} ->
FromPid ! {ReqRef, Num},
State;
{{FromPid, ReqRef}, {individual_reply, IsSuccess}} ->
Res =
case IsSuccess of
true -> ok;
false -> {error, {unrecoverable_error, bad_request}}
end,
FromPid ! {ReqRef, Res},
State;
{individual_reply, IsSuccess, ReplyFun} ->
Res =
case IsSuccess of
true -> ok;
false -> {error, {unrecoverable_error, bad_request}}
end,
apply_reply(ReplyFun, Res),
State;
{{random_reply, RandNum}, ReplyFun} ->
%% usually a behaving connector should reply once and only once for
%% each (batch) request
%% but we try to reply random results a random number of times
%% with 'ok' in the result, the buffer worker should eventually
%% drain the buffer (and inflights table)
ReplyCount = 1 + (RandNum rem 3),
Results = make_random_replies(ReplyCount),
%% add a delay to trigger inflight full
lists:foreach(
fun(Result) ->
timer:sleep(rand:uniform(5)),
apply_reply(ReplyFun, Result)
end,
Results
),
State;
{{sleep_before_reply, _} = SleepQ, ReplyFun} ->
apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
State;
{{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} ->
FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
State
end,
counter_loop(NewState).
handle_query(Mode, {sleep_before_reply, For} = Query, Status) ->
ok = timer:sleep(For),
Result =
case Status of
running -> ok;
blocked -> {error, {recoverable_error, blocked}}
end,
?tp(connector_demo_sleep_handled, #{
mode => Mode, query => Query, slept => For, result => Result
}),
Result.
maybe_register(Name, Pid, true) ->
ct:pal("---- Register Name: ~p", [Name]),
ct:pal("---- whereis(): ~p", [whereis(Name)]),
erlang:register(Name, Pid);
maybe_register(_Name, _Pid, false) ->
true.
apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
apply(ReplyFun, Args ++ [Result]).
make_random_replies(0) ->
[];
make_random_replies(N) ->
[make_random_reply(N) | make_random_replies(N - 1)].
make_random_reply(N) ->
case rand:uniform(3) of
1 ->
{ok, N};
2 ->
{error, {recoverable_error, N}};
3 ->
{error, {unrecoverable_error, N}}
end.