chore(dashboard): make dirty operation into ekka_mnesia transation
This commit is contained in:
parent
cae79a0584
commit
776604a90b
|
@ -139,7 +139,10 @@ update_pwd(Username, Fun) ->
|
|||
|
||||
|
||||
-spec(lookup_user(binary()) -> [mqtt_admin()]).
|
||||
lookup_user(Username) when is_binary(Username) -> mnesia:dirty_read(mqtt_admin, Username).
|
||||
lookup_user(Username) when is_binary(Username) ->
|
||||
Fun = fun() -> mnesia:read(mqtt_admin, Username) end,
|
||||
{atomic, User} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun),
|
||||
User.
|
||||
|
||||
-spec(all_users() -> [#mqtt_admin{}]).
|
||||
all_users() -> ets:tab2list(mqtt_admin).
|
||||
|
|
|
@ -162,7 +162,8 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
|
|||
diff(Sent, Sent0),
|
||||
diff(Dropped, Dropped0)},
|
||||
Ts = get_local_time(),
|
||||
_ = mnesia:dirty_write(emqx_collect, #mqtt_collect{timestamp = Ts, collect = Collect}),
|
||||
ekka_mnesia:transaction(ekka_mnesia:local_content_shard(),
|
||||
fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]),
|
||||
{Received, Sent, Dropped}.
|
||||
|
||||
avg(Items) ->
|
||||
|
|
|
@ -103,7 +103,8 @@ do_sign(Username, Password) ->
|
|||
},
|
||||
Signed = jose_jwt:sign(JWK, JWS, JWT),
|
||||
{_, Token} = jose_jws:compact(Signed),
|
||||
ok = ekka_mnesia:dirty_write(format(Token, Username, ExpTime)),
|
||||
JWTRec = format(Token, Username, ExpTime),
|
||||
ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
|
||||
{ok, Token}.
|
||||
|
||||
do_verify(Token)->
|
||||
|
@ -111,8 +112,9 @@ do_verify(Token)->
|
|||
{ok, JWT = #mqtt_admin_jwt{exptime = ExpTime}} ->
|
||||
case ExpTime > erlang:system_time(millisecond) of
|
||||
true ->
|
||||
ekka_mnesia:dirty_write(JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}),
|
||||
ok;
|
||||
NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()},
|
||||
{atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]),
|
||||
Res;
|
||||
_ ->
|
||||
{error, token_timeout}
|
||||
end;
|
||||
|
@ -132,14 +134,18 @@ do_destroy_by_username(Username) ->
|
|||
%% jwt internal util function
|
||||
-spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}).
|
||||
lookup(Token) ->
|
||||
case mnesia:dirty_read(?TAB, Token) of
|
||||
[JWT] -> {ok, JWT};
|
||||
[] -> {error, not_found}
|
||||
Fun = fun() -> mnesia:read(?TAB, Token) end,
|
||||
case ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun) of
|
||||
{atomic, [JWT]} -> {ok, JWT};
|
||||
{atomic, []} -> {error, not_found}
|
||||
end.
|
||||
|
||||
lookup_by_username(Username) ->
|
||||
Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}],
|
||||
mnesia:dirty_select(?TAB, Spec).
|
||||
Fun = fun() -> mnesia:select(?TAB, Spec) end,
|
||||
{atomic, List} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun),
|
||||
List.
|
||||
|
||||
|
||||
jwk(Username, Password, Salt) ->
|
||||
Key = erlang:md5(<<Salt/binary, Username/binary, Password/binary>>),
|
||||
|
@ -187,7 +193,8 @@ handle_info(clean_jwt, State) ->
|
|||
timer_clean(self()),
|
||||
Now = erlang:system_time(millisecond),
|
||||
Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}],
|
||||
JWTList = mnesia:dirty_select(?TAB, Spec),
|
||||
{atomic, JWTList} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD,
|
||||
fun() -> mnesia:select(?TAB, Spec) end),
|
||||
destroy(JWTList),
|
||||
{noreply, State};
|
||||
handle_info(_Info, State) ->
|
||||
|
|
|
@ -18,8 +18,7 @@
|
|||
|
||||
%% Check for the mnesia calls forbidden by Ekka:
|
||||
{xref_queries,
|
||||
[ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", [{{emqx_dashboard_collection,flush,2},{mnesia,dirty_write,2}}]}
|
||||
, {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
|
||||
[ {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
|
||||
, {"E || \"mnesia\":\"transaction\"/\".*\" : Fun", []}
|
||||
, {"E || \"mnesia\":\"async_dirty\"/\".*\" : Fun", []}
|
||||
, {"E || \"mnesia\":\"clear_table\"/\".*\" : Fun", []}
|
||||
|
|
Loading…
Reference in New Issue