refactor(stomp): remove transaction module
This commit is contained in:
parent
1263a05bbc
commit
713b4c7804
|
@ -14,5 +14,3 @@
|
||||||
{cover_enabled, true}.
|
{cover_enabled, true}.
|
||||||
{cover_opts, [verbose]}.
|
{cover_opts, [verbose]}.
|
||||||
{cover_export_enabled, true}.
|
{cover_export_enabled, true}.
|
||||||
|
|
||||||
{plugins, [coveralls]}.
|
|
||||||
|
|
|
@ -127,10 +127,6 @@ handle_info(timeout, State) ->
|
||||||
handle_info({shutdown, Reason}, State) ->
|
handle_info({shutdown, Reason}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
handle_info({transaction, {timeout, Id}}, State) ->
|
|
||||||
emqx_stomp_transaction:timeout(Id),
|
|
||||||
noreply(State);
|
|
||||||
|
|
||||||
handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming;
|
handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming;
|
||||||
TMsg =:= outgoing ->
|
TMsg =:= outgoing ->
|
||||||
|
|
||||||
|
@ -145,6 +141,9 @@ handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming;
|
||||||
shutdown({sock_error, Reason}, State)
|
shutdown({sock_error, Reason}, State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
handle_info({timeout, TRef, TMsg}, State) ->
|
||||||
|
with_proto(timeout, [TRef, TMsg], State);
|
||||||
|
|
||||||
handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) ->
|
handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) ->
|
||||||
stop(Error, State);
|
stop(Error, State);
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,12 @@
|
||||||
, timeout/3
|
, timeout/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% for trans callback
|
||||||
|
-export([ handle_recv_send_frame/2
|
||||||
|
, handle_recv_ack_frame/2
|
||||||
|
, handle_recv_nack_frame/2
|
||||||
|
]).
|
||||||
|
|
||||||
-record(pstate, {
|
-record(pstate, {
|
||||||
peername,
|
peername,
|
||||||
heartfun,
|
heartfun,
|
||||||
|
@ -50,14 +56,18 @@
|
||||||
allow_anonymous,
|
allow_anonymous,
|
||||||
default_user,
|
default_user,
|
||||||
subscriptions = [],
|
subscriptions = [],
|
||||||
timers :: #{atom() => disable | undefined | reference()}
|
timers :: #{atom() => disable | undefined | reference()},
|
||||||
|
transaction :: #{binary() => list()}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
incoming_timer => incoming,
|
incoming_timer => incoming,
|
||||||
outgoing_timer => outgoing
|
outgoing_timer => outgoing,
|
||||||
|
clean_trans_timer => clean_trans
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(TRANS_TIMEOUT, 60000).
|
||||||
|
|
||||||
-type(pstate() :: #pstate{}).
|
-type(pstate() :: #pstate{}).
|
||||||
|
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
|
@ -70,6 +80,7 @@ init(#{peername := Peername,
|
||||||
heartfun = HeartFun,
|
heartfun = HeartFun,
|
||||||
sendfun = SendFun,
|
sendfun = SendFun,
|
||||||
timers = #{},
|
timers = #{},
|
||||||
|
transaction = #{},
|
||||||
allow_anonymous = AllowAnonymous,
|
allow_anonymous = AllowAnonymous,
|
||||||
default_user = DefaultUser}.
|
default_user = DefaultUser}.
|
||||||
|
|
||||||
|
@ -121,18 +132,10 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
||||||
received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) ->
|
received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) ->
|
||||||
{error, unexpected_connect, State};
|
{error, unexpected_connect, State};
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
||||||
Topic = header(<<"destination">>, Headers),
|
|
||||||
Action = fun(State0) ->
|
|
||||||
maybe_send_receipt(receipt_id(Headers), State0),
|
|
||||||
emqx_broker:publish(
|
|
||||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
|
||||||
),
|
|
||||||
State0
|
|
||||||
end,
|
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, Action(State)};
|
undefined -> {ok, handle_recv_send_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, Action, receipt_id(Headers), State)
|
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||||
|
@ -167,15 +170,10 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
||||||
%% transaction:tx1
|
%% transaction:tx1
|
||||||
%%
|
%%
|
||||||
%% ^@
|
%% ^@
|
||||||
received(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
Id = header(<<"id">>, Headers),
|
|
||||||
Action = fun(State0) ->
|
|
||||||
maybe_send_receipt(receipt_id(Headers), State0),
|
|
||||||
ack(Id, State0)
|
|
||||||
end,
|
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, Action(State)};
|
undefined -> {ok, handle_recv_ack_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, Action, receipt_id(Headers), State)
|
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% NACK
|
%% NACK
|
||||||
|
@ -183,29 +181,25 @@ received(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
%% transaction:tx1
|
%% transaction:tx1
|
||||||
%%
|
%%
|
||||||
%% ^@
|
%% ^@
|
||||||
received(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
||||||
Id = header(<<"id">>, Headers),
|
|
||||||
Action = fun(State0) ->
|
|
||||||
maybe_send_receipt(receipt_id(Headers), State0),
|
|
||||||
nack(Id, State0)
|
|
||||||
end,
|
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, Action(State)};
|
undefined -> {ok, handle_recv_nack_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, Action, receipt_id(Headers), State)
|
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% BEGIN
|
%% BEGIN
|
||||||
%% transaction:tx1
|
%% transaction:tx1
|
||||||
%%
|
%%
|
||||||
%% ^@
|
%% ^@
|
||||||
received(#stomp_frame{command = <<"BEGIN">>, headers = Headers}, State) ->
|
received(#stomp_frame{command = <<"BEGIN">>, headers = Headers},
|
||||||
Id = header(<<"transaction">>, Headers),
|
State = #pstate{transaction = Trans}) ->
|
||||||
%% self() ! TimeoutMsg
|
Id = header(<<"transaction">>, Headers),
|
||||||
TimeoutMsg = {transaction, {timeout, Id}},
|
case maps:get(Id, Trans, undefined) of
|
||||||
case emqx_stomp_transaction:start(Id, TimeoutMsg) of
|
undefined ->
|
||||||
{ok, _Transaction} ->
|
Ts = erlang:system_time(millisecond),
|
||||||
maybe_send_receipt(receipt_id(Headers), State);
|
NState = ensure_clean_trans_timer(State#pstate{transaction = Trans#{Id => {Ts, []}}}),
|
||||||
{error, already_started} ->
|
maybe_send_receipt(receipt_id(Headers), NState);
|
||||||
|
_ ->
|
||||||
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " already started"]), State)
|
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " already started"]), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -213,12 +207,16 @@ received(#stomp_frame{command = <<"BEGIN">>, headers = Headers}, State) ->
|
||||||
%% transaction:tx1
|
%% transaction:tx1
|
||||||
%%
|
%%
|
||||||
%% ^@
|
%% ^@
|
||||||
received(#stomp_frame{command = <<"COMMIT">>, headers = Headers}, State) ->
|
received(#stomp_frame{command = <<"COMMIT">>, headers = Headers},
|
||||||
|
State = #pstate{transaction = Trans}) ->
|
||||||
Id = header(<<"transaction">>, Headers),
|
Id = header(<<"transaction">>, Headers),
|
||||||
case emqx_stomp_transaction:commit(Id, State) of
|
case maps:get(Id, Trans, undefined) of
|
||||||
{ok, NState} ->
|
{_, Actions} ->
|
||||||
|
NState = lists:foldr(fun({Func, Args}, S) ->
|
||||||
|
erlang:apply(Func, Args ++ [S])
|
||||||
|
end, State#pstate{transaction = maps:remove(Id, Trans)}, Actions),
|
||||||
maybe_send_receipt(receipt_id(Headers), NState);
|
maybe_send_receipt(receipt_id(Headers), NState);
|
||||||
{error, not_found} ->
|
_ ->
|
||||||
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " not found"]), State)
|
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " not found"]), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -226,12 +224,14 @@ received(#stomp_frame{command = <<"COMMIT">>, headers = Headers}, State) ->
|
||||||
%% transaction:tx1
|
%% transaction:tx1
|
||||||
%%
|
%%
|
||||||
%% ^@
|
%% ^@
|
||||||
received(#stomp_frame{command = <<"ABORT">>, headers = Headers}, State) ->
|
received(#stomp_frame{command = <<"ABORT">>, headers = Headers},
|
||||||
|
State = #pstate{transaction = Trans}) ->
|
||||||
Id = header(<<"transaction">>, Headers),
|
Id = header(<<"transaction">>, Headers),
|
||||||
case emqx_stomp_transaction:abort(Id) of
|
case maps:get(Id, Trans, undefined) of
|
||||||
ok ->
|
{_, _Actions} ->
|
||||||
maybe_send_receipt(receipt_id(Headers), State);
|
NState = State#pstate{transaction = maps:remove(Id, Trans)},
|
||||||
{error, not_found} ->
|
maybe_send_receipt(receipt_id(Headers), NState);
|
||||||
|
_ ->
|
||||||
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " not found"]), State)
|
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " not found"]), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -247,8 +247,8 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
||||||
{<<"message-id">>, next_msgid()},
|
{<<"message-id">>, next_msgid()},
|
||||||
{<<"destination">>, Topic},
|
{<<"destination">>, Topic},
|
||||||
{<<"content-type">>, <<"text/plain">>}],
|
{<<"content-type">>, <<"text/plain">>}],
|
||||||
Headers1 = case Ack of
|
Headers1 = case Ack of
|
||||||
_ when Ack =:= <<"client">> orelse Ack =:= <<"client-individual">> ->
|
_ when Ack =:= <<"client">> orelse Ack =:= <<"client-individual">> ->
|
||||||
Headers0 ++ [{<<"ack">>, next_ackid()}];
|
Headers0 ++ [{<<"ack">>, next_ackid()}];
|
||||||
_ ->
|
_ ->
|
||||||
Headers0
|
Headers0
|
||||||
|
@ -290,7 +290,12 @@ timeout(_TRef, {outgoing, NewVal},
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{ok, NHrtBt} ->
|
{ok, NHrtBt} ->
|
||||||
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
|
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
|
timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) ->
|
||||||
|
Now = erlang:system_time(millisecond),
|
||||||
|
NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans),
|
||||||
|
{ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}.
|
||||||
|
|
||||||
negotiate_version(undefined) ->
|
negotiate_version(undefined) ->
|
||||||
{ok, <<"1.0">>};
|
{ok, <<"1.0">>};
|
||||||
|
@ -318,11 +323,12 @@ check_login(Login, Passcode, _, DefaultUser) ->
|
||||||
{_, _ } -> false
|
{_, _ } -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_action(Id, Action, ReceiptId, State) ->
|
add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) ->
|
||||||
case emqx_stomp_transaction:add(Id, Action) of
|
case maps:get(Id, Trans, undefined) of
|
||||||
{ok, _} ->
|
{Ts, Actions} ->
|
||||||
{ok, State};
|
NTrans = Trans#{Id => {Ts, [Action|Actions]}},
|
||||||
{error, not_found} ->
|
{ok, State#pstate{transaction = NTrans}};
|
||||||
|
_ ->
|
||||||
send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
|
send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -331,7 +337,7 @@ maybe_send_receipt(undefined, State) ->
|
||||||
maybe_send_receipt(ReceiptId, State) ->
|
maybe_send_receipt(ReceiptId, State) ->
|
||||||
send(receipt_frame(ReceiptId), State).
|
send(receipt_frame(ReceiptId), State).
|
||||||
|
|
||||||
ack(_Id, State) ->
|
ack(_Id, State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
nack(_Id, State) -> State.
|
nack(_Id, State) -> State.
|
||||||
|
@ -360,7 +366,7 @@ next_msgid() ->
|
||||||
undefined -> 1;
|
undefined -> 1;
|
||||||
I -> I
|
I -> I
|
||||||
end,
|
end,
|
||||||
put(msgid, MsgId + 1),
|
put(msgid, MsgId + 1),
|
||||||
MsgId.
|
MsgId.
|
||||||
|
|
||||||
next_ackid() ->
|
next_ackid() ->
|
||||||
|
@ -368,16 +374,16 @@ next_ackid() ->
|
||||||
undefined -> 1;
|
undefined -> 1;
|
||||||
I -> I
|
I -> I
|
||||||
end,
|
end,
|
||||||
put(ackid, AckId + 1),
|
put(ackid, AckId + 1),
|
||||||
AckId.
|
AckId.
|
||||||
|
|
||||||
make_mqtt_message(Topic, Headers, Body) ->
|
make_mqtt_message(Topic, Headers, Body) ->
|
||||||
Msg = emqx_message:make(stomp, Topic, Body),
|
Msg = emqx_message:make(stomp, Topic, Body),
|
||||||
Headers1 = lists:foldl(fun(Key, Headers0) ->
|
Headers1 = lists:foldl(fun(Key, Headers0) ->
|
||||||
proplists:delete(Key, Headers0)
|
proplists:delete(Key, Headers0)
|
||||||
end, Headers, [<<"destination">>,
|
end, Headers, [<<"destination">>,
|
||||||
<<"content-length">>,
|
<<"content-length">>,
|
||||||
<<"content-type">>,
|
<<"content-type">>,
|
||||||
<<"transaction">>,
|
<<"transaction">>,
|
||||||
<<"receipt">>]),
|
<<"receipt">>]),
|
||||||
emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
|
emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
|
||||||
|
@ -385,6 +391,33 @@ make_mqtt_message(Topic, Headers, Body) ->
|
||||||
receipt_id(Headers) ->
|
receipt_id(Headers) ->
|
||||||
header(<<"receipt">>, Headers).
|
header(<<"receipt">>, Headers).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Transaction Handle
|
||||||
|
|
||||||
|
handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
||||||
|
Topic = header(<<"destination">>, Headers),
|
||||||
|
maybe_send_receipt(receipt_id(Headers), State),
|
||||||
|
emqx_broker:publish(
|
||||||
|
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
||||||
|
),
|
||||||
|
State.
|
||||||
|
|
||||||
|
handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
|
Id = header(<<"id">>, Headers),
|
||||||
|
maybe_send_receipt(receipt_id(Headers), State),
|
||||||
|
ack(Id, State).
|
||||||
|
|
||||||
|
handle_recv_nack_frame(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
||||||
|
Id = header(<<"id">>, Headers),
|
||||||
|
maybe_send_receipt(receipt_id(Headers), State),
|
||||||
|
nack(Id, State).
|
||||||
|
|
||||||
|
ensure_clean_trans_timer(State = #pstate{transaction = Trans}) ->
|
||||||
|
case maps:size(Trans) of
|
||||||
|
0 -> State;
|
||||||
|
_ -> ensure_timer(clean_trans_timer, State)
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Heartbeat
|
%% Heartbeat
|
||||||
|
|
||||||
|
@ -433,4 +466,7 @@ clean_timer(Name, State = #pstate{timers = Timers}) ->
|
||||||
interval(incoming_timer, #pstate{heart_beats = HrtBt}) ->
|
interval(incoming_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(incoming, HrtBt);
|
emqx_stomp_heartbeat:interval(incoming, HrtBt);
|
||||||
interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(outgoing, HrtBt).
|
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
||||||
|
interval(clean_trans_timer, _) ->
|
||||||
|
?TRANS_TIMEOUT.
|
||||||
|
|
||||||
|
|
|
@ -1,77 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Stomp Transaction
|
|
||||||
|
|
||||||
-module(emqx_stomp_transaction).
|
|
||||||
|
|
||||||
-include("emqx_stomp.hrl").
|
|
||||||
|
|
||||||
-export([ start/2
|
|
||||||
, add/2
|
|
||||||
, commit/2
|
|
||||||
, abort/1
|
|
||||||
, timeout/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(transaction, {id, actions, tref}).
|
|
||||||
|
|
||||||
-define(TIMEOUT, 60000).
|
|
||||||
|
|
||||||
start(Id, TimeoutMsg) ->
|
|
||||||
case get({transaction, Id}) of
|
|
||||||
undefined ->
|
|
||||||
TRef = erlang:send_after(?TIMEOUT, self(), TimeoutMsg),
|
|
||||||
Transaction = #transaction{id = Id, actions = [], tref = TRef},
|
|
||||||
put({transaction, Id}, Transaction),
|
|
||||||
{ok, Transaction};
|
|
||||||
_Transaction ->
|
|
||||||
{error, already_started}
|
|
||||||
end.
|
|
||||||
|
|
||||||
add(Id, Action) ->
|
|
||||||
Fun = fun(Transaction = #transaction{actions = Actions}) ->
|
|
||||||
Transaction1 = Transaction#transaction{actions = [Action | Actions]},
|
|
||||||
put({transaction, Id}, Transaction1),
|
|
||||||
{ok, Transaction1}
|
|
||||||
end,
|
|
||||||
with_transaction(Id, Fun).
|
|
||||||
|
|
||||||
commit(Id, InitState) ->
|
|
||||||
Fun = fun(Transaction = #transaction{actions = Actions}) ->
|
|
||||||
done(Transaction),
|
|
||||||
{ok, lists:foldr(fun(Action, State) -> Action(State) end,
|
|
||||||
InitState, Actions)}
|
|
||||||
end,
|
|
||||||
with_transaction(Id, Fun).
|
|
||||||
|
|
||||||
abort(Id) ->
|
|
||||||
with_transaction(Id, fun done/1).
|
|
||||||
|
|
||||||
timeout(Id) ->
|
|
||||||
erase({transaction, Id}).
|
|
||||||
|
|
||||||
done(#transaction{id = Id, tref = TRef}) ->
|
|
||||||
erase({transaction, Id}),
|
|
||||||
catch erlang:cancel_timer(TRef),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
with_transaction(Id, Fun) ->
|
|
||||||
case get({transaction, Id}) of
|
|
||||||
undefined -> {error, not_found};
|
|
||||||
Transaction -> Fun(Transaction)
|
|
||||||
end.
|
|
||||||
|
|
Loading…
Reference in New Issue