commit
5700118af1
12
Makefile
12
Makefile
|
@ -4,12 +4,12 @@ PROJECT_VERSION = 2.0
|
|||
|
||||
DEPS = gproc lager gen_logger gen_conf esockd mochiweb
|
||||
|
||||
dep_gproc = git https://github.com/uwiger/gproc.git
|
||||
dep_lager = git https://github.com/basho/lager.git
|
||||
dep_gen_conf = git https://github.com/emqtt/gen_conf.git
|
||||
dep_gen_logger = git https://github.com/emqtt/gen_logger.git
|
||||
dep_esockd = git https://github.com/emqtt/esockd.git emq20
|
||||
dep_mochiweb = git https://github.com/emqtt/mochiweb.git
|
||||
dep_gproc = git https://github.com/uwiger/gproc
|
||||
dep_lager = git https://github.com/basho/lager
|
||||
dep_gen_conf = git https://github.com/emqtt/gen_conf
|
||||
dep_gen_logger = git https://github.com/emqtt/gen_logger
|
||||
dep_esockd = git https://github.com/emqtt/esockd emq20
|
||||
dep_mochiweb = git https://github.com/emqtt/mochiweb
|
||||
|
||||
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
|
|
|
@ -5,6 +5,27 @@
|
|||
Changes
|
||||
=======
|
||||
|
||||
.. _release_2.0_rc.1:
|
||||
|
||||
----------------
|
||||
Version 2.0-rc.1
|
||||
----------------
|
||||
|
||||
*Release Date: 2016-10-03*
|
||||
|
||||
1. `mqtt/superuser` POST called two times in `emqtt_auth_http` (#696)
|
||||
|
||||
2. Close MQTT TCP connection if authentication failed (#707)
|
||||
|
||||
3. Improve the plugin management. Developer don't need to add plugin's config to rel/sys.config
|
||||
|
||||
4. Add `BUILD_DEPS` in the plugin's Makefile::
|
||||
|
||||
BUILD_DEPS = emqttd
|
||||
dep_emqttd = git https://github.com/emqtt/emqttd emq20
|
||||
|
||||
5. Improve the design of Redis ACL.
|
||||
|
||||
.. _release_2.0_beta.3:
|
||||
|
||||
------------------
|
||||
|
|
|
@ -415,26 +415,25 @@ etc/plugins/emqttd_auth_redis.conf:
|
|||
|
||||
%% Variables: %u = username, %c = clientid
|
||||
|
||||
%% HMGET mqtt_user:%u is_superuser
|
||||
{supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]}.
|
||||
|
||||
%% HMGET mqtt_user:%u password
|
||||
{authcmd, ["HGET", "mqtt_user:%u", "password"]}.
|
||||
{authcmd, "HGET mqtt_user:%u password"}.
|
||||
|
||||
%% Password hash algorithm: plain, md5, sha, sha256, pbkdf2?
|
||||
{password_hash, sha256}.
|
||||
|
||||
%% SMEMBERS mqtt_acl:%u
|
||||
{aclcmd, ["SMEMBERS", "mqtt_acl:%u"]}.
|
||||
%% HMGET mqtt_user:%u is_superuser
|
||||
{supercmd, "HGET mqtt_user:%u is_superuser"}.
|
||||
|
||||
%% HGETALL mqtt_acl:%u
|
||||
{aclcmd, "HGETALL mqtt_acl:%u"}.
|
||||
|
||||
%% If no rules matched, return...
|
||||
{acl_nomatch, deny}.
|
||||
|
||||
%% Load Subscriptions form Redis when client connected.
|
||||
{subcmd, ["HGETALL", "mqtt_subs:%u"]}.
|
||||
{subcmd, "HGETALL mqtt_sub:%u"}.
|
||||
|
||||
|
||||
Redis User HASH
|
||||
Redis User Hash
|
||||
---------------
|
||||
|
||||
Set a 'user' hash with 'password' field, for example::
|
||||
|
@ -442,16 +441,18 @@ Set a 'user' hash with 'password' field, for example::
|
|||
HSET mqtt_user:<username> is_superuser 1
|
||||
HSET mqtt_user:<username> password "passwd"
|
||||
|
||||
Redis ACL Rule SET
|
||||
------------------
|
||||
Redis ACL Rule Hash
|
||||
-------------------
|
||||
|
||||
The plugin uses a redis SET to store ACL rules::
|
||||
The plugin uses a redis Hash to store ACL rules::
|
||||
|
||||
SADD mqtt_acl:<username> "publish topic1"
|
||||
SADD mqtt_acl:<username> "subscribe topic2"
|
||||
SADD mqtt_acl:<username> "pubsub topic3"
|
||||
HSET mqtt_acl:<username> topic1 1
|
||||
HSET mqtt_acl:<username> topic2 2
|
||||
HSET mqtt_acl:<username> topic3 3
|
||||
|
||||
Redis Subscription HASH
|
||||
.. NOTE:: 1: subscribe, 2: publish, 3: pubsub
|
||||
|
||||
Redis Subscription Hash
|
||||
-----------------------
|
||||
|
||||
The plugin can store static subscriptions in a redis Hash::
|
||||
|
@ -495,14 +496,14 @@ etc/plugins/emqttd_plugin_mongo.conf:
|
|||
%% Variables: %u = username, %c = clientid
|
||||
|
||||
%% Superuser Query
|
||||
{superquery, pool, [
|
||||
{superquery, [
|
||||
{collection, "mqtt_user"},
|
||||
{super_field, "is_superuser"},
|
||||
{selector, {"username", "%u"}}
|
||||
]}.
|
||||
|
||||
%% Authentication Query
|
||||
{authquery, pool, [
|
||||
{authquery, [
|
||||
{collection, "mqtt_user"},
|
||||
{password_field, "password"},
|
||||
%% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
|
||||
|
@ -511,7 +512,7 @@ etc/plugins/emqttd_plugin_mongo.conf:
|
|||
]}.
|
||||
|
||||
%% ACL Query: "%u" = username, "%c" = clientid
|
||||
{aclquery, pool, [
|
||||
{aclquery, [
|
||||
{collection, "mqtt_acl"},
|
||||
{selector, {"username", "%u"}}
|
||||
]}.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{deps, [
|
||||
{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}}
|
||||
{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}}
|
||||
]}.
|
||||
{erl_opts, [{parse_transform,lager_transform}]}.
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
is_subscribed/2, subscriber_down/1]).
|
||||
|
||||
%% Hooks API
|
||||
-export([hook/4, hook/3, unhook/2, run_hooks/3]).
|
||||
-export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]).
|
||||
|
||||
%% Debug API
|
||||
-export([dump/0]).
|
||||
|
@ -151,6 +151,10 @@ hook(Hook, Function, InitArgs, Priority) ->
|
|||
unhook(Hook, Function) ->
|
||||
emqttd_hook:delete(Hook, Function).
|
||||
|
||||
-spec(run_hooks(atom(), list(any())) -> ok | stop).
|
||||
run_hooks(Hook, Args) ->
|
||||
emqttd_hook:run(Hook, Args).
|
||||
|
||||
-spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}).
|
||||
run_hooks(Hook, Args, Acc) ->
|
||||
emqttd_hook:run(Hook, Args, Acc).
|
||||
|
|
|
@ -60,6 +60,7 @@ auth(_Client, _Password, []) ->
|
|||
auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
|
||||
case catch Mod:check(Client, Password, State) of
|
||||
ok -> ok;
|
||||
{ok, IsSuper} -> {ok, IsSuper};
|
||||
ignore -> auth(Client, Password, Mods);
|
||||
{error, Reason} -> {error, Reason};
|
||||
{'EXIT', Error} -> {error, Error}
|
||||
|
|
|
@ -24,16 +24,16 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-callback init(AclOpts :: list()) -> {ok, State :: any()}.
|
||||
-callback(init(AclOpts :: list()) -> {ok, State :: any()}).
|
||||
|
||||
-callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
|
||||
Client :: mqtt_client(),
|
||||
PubSub :: pubsub(),
|
||||
Topic :: binary().
|
||||
-callback(check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
|
||||
Client :: mqtt_client(),
|
||||
PubSub :: pubsub(),
|
||||
Topic :: binary()).
|
||||
|
||||
-callback reload_acl(State :: any()) -> ok | {error, any()}.
|
||||
-callback(reload_acl(State :: any()) -> ok | {error, any()}).
|
||||
|
||||
-callback description() -> string().
|
||||
-callback(description() -> string()).
|
||||
|
||||
-else.
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ start(_StartType, _StartArgs) ->
|
|||
start_servers(Sup),
|
||||
emqttd_cli:load(),
|
||||
load_all_mods(),
|
||||
emqttd_plugins:init(),
|
||||
emqttd_plugins:load(),
|
||||
start_listeners(),
|
||||
register(emqttd, self()),
|
||||
|
|
|
@ -14,14 +14,13 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Authentication Behaviour.
|
||||
-module(emqttd_auth_mod).
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-export([passwd_hash/2]).
|
||||
|
||||
-type hash_type() :: plain | md5 | sha | sha256.
|
||||
-type(hash_type() :: plain | md5 | sha | sha256).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Authentication behavihour
|
||||
|
@ -29,14 +28,14 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-callback init(AuthOpts :: list()) -> {ok, State :: any()}.
|
||||
-callback(init(AuthOpts :: list()) -> {ok, State :: any()}).
|
||||
|
||||
-callback check(Client, Password, State) -> ok | ignore | {error, string()} when
|
||||
Client :: mqtt_client(),
|
||||
Password :: binary(),
|
||||
State :: any().
|
||||
-callback(check(Client, Password, State) -> ok | | {ok, boolean()} | ignore | {error, string()} when
|
||||
Client :: mqtt_client(),
|
||||
Password :: binary(),
|
||||
State :: any()).
|
||||
|
||||
-callback description() -> string().
|
||||
-callback(description() -> string()).
|
||||
|
||||
-else.
|
||||
|
||||
|
|
|
@ -160,5 +160,5 @@ md5_hash(SaltBin, Password) ->
|
|||
erlang:md5(<<SaltBin/binary, Password/binary>>).
|
||||
|
||||
salt() ->
|
||||
emqttd_time:seed(), Salt = random:uniform(16#ffffffff), <<Salt:32>>.
|
||||
emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <<Salt:32>>.
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
-export([start_link/0]).
|
||||
|
||||
%% Hooks API
|
||||
-export([add/3, add/4, delete/2, run/3, lookup/1]).
|
||||
-export([add/3, add/4, delete/2, run/2, run/3, lookup/1]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
|
@ -63,10 +63,26 @@ add(HookPoint, Function, InitArgs, Priority) ->
|
|||
delete(HookPoint, Function) ->
|
||||
gen_server:call(?MODULE, {delete, HookPoint, Function}).
|
||||
|
||||
-spec(run(atom(), list(any()), any()) -> any()).
|
||||
%% @doc Run hooks without Acc.
|
||||
-spec(run(atom(), list(Arg :: any())) -> ok | stop).
|
||||
run(HookPoint, Args) ->
|
||||
run_(lookup(HookPoint), Args).
|
||||
|
||||
-spec(run(atom(), list(Arg :: any()), any()) -> any()).
|
||||
run(HookPoint, Args, Acc) ->
|
||||
run_(lookup(HookPoint), Args, Acc).
|
||||
|
||||
%% @private
|
||||
run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) ->
|
||||
case apply(Fun, lists:append([Args, InitArgs])) of
|
||||
ok -> run_(Callbacks, Args);
|
||||
stop -> stop;
|
||||
_Any -> run_(Callbacks, Args)
|
||||
end;
|
||||
|
||||
run_([], _Args) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) ->
|
||||
case apply(Fun, lists:append([Args, [Acc], InitArgs])) of
|
||||
|
|
|
@ -18,12 +18,27 @@
|
|||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-export([init/0]).
|
||||
|
||||
-export([load/0, unload/0]).
|
||||
|
||||
-export([load/1, unload/1]).
|
||||
|
||||
-export([list/0]).
|
||||
|
||||
init() ->
|
||||
case emqttd:conf(plugins_etc_dir) of
|
||||
{ok, PluginsEtc} ->
|
||||
CfgFiles = filelib:wildcard("*.conf", PluginsEtc),
|
||||
lists:foreach(fun(CfgFile) ->
|
||||
App = app_name(CfgFile),
|
||||
application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)),
|
||||
gen_conf:init(App)
|
||||
end, CfgFiles);
|
||||
undefined ->
|
||||
ok
|
||||
end.
|
||||
|
||||
%% @doc Load all plugins when the broker started.
|
||||
-spec(load() -> list() | {error, any()}).
|
||||
load() ->
|
||||
|
@ -90,11 +105,11 @@ list() ->
|
|||
end.
|
||||
|
||||
plugin(CfgFile) ->
|
||||
[AppName | _] = string:tokens(CfgFile, "."),
|
||||
{ok, Attrs} = application:get_all_key(list_to_atom(AppName)),
|
||||
AppName = app_name(CfgFile),
|
||||
{ok, Attrs} = application:get_all_key(AppName),
|
||||
Ver = proplists:get_value(vsn, Attrs, "0"),
|
||||
Descr = proplists:get_value(description, Attrs, ""),
|
||||
#mqtt_plugin{name = list_to_atom(AppName), version = Ver, descr = Descr}.
|
||||
#mqtt_plugin{name = AppName, version = Ver, descr = Descr}.
|
||||
|
||||
%% @doc Load a Plugin
|
||||
-spec(load(atom()) -> ok | {error, any()}).
|
||||
|
@ -185,6 +200,9 @@ stop_app(App) ->
|
|||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
app_name(File) ->
|
||||
[AppName | _] = string:tokens(File, "."), list_to_atom(AppName).
|
||||
|
||||
names(plugin) ->
|
||||
names(list());
|
||||
|
||||
|
@ -244,4 +262,3 @@ write_loaded(AppNames) ->
|
|||
lager:error("Open File ~p Error: ~p", [File, Error]),
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
%% Protocol State
|
||||
-record(proto_state, {peername, sendfun, connected = false,
|
||||
client_id, client_pid, clean_sess,
|
||||
proto_ver, proto_name, username,
|
||||
proto_ver, proto_name, username, is_superuser = false,
|
||||
will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
|
||||
session, ws_initial_headers, %% Headers from first HTTP request for websocket client
|
||||
connected_at}).
|
||||
|
@ -159,8 +159,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
{ReturnCode1, SessPresent, State3} =
|
||||
case validate_connect(Var, State1) of
|
||||
?CONNACK_ACCEPT ->
|
||||
case emqttd_access_control:auth(client(State1), Password) of
|
||||
ok ->
|
||||
case authenticate(client(State1), Password) of
|
||||
{ok, IsSuperuser} ->
|
||||
%% Generate clientId if null
|
||||
State2 = maybe_set_clientid(State1),
|
||||
|
||||
|
@ -172,7 +172,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
%% Start keepalive
|
||||
start_keepalive(KeepAlive),
|
||||
%% ACCEPT
|
||||
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
|
||||
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
|
||||
{error, Error} ->
|
||||
exit({shutdown, Error})
|
||||
end;
|
||||
|
@ -186,14 +186,14 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
%% Run hooks
|
||||
emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)),
|
||||
%% Send connack
|
||||
send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
|
||||
send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
|
||||
%% stop if authentication failure
|
||||
stop_if_auth_failure(ReturnCode1, State3);
|
||||
|
||||
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
|
||||
case check_acl(publish, Topic, client(State)) of
|
||||
allow ->
|
||||
publish(Packet, State);
|
||||
deny ->
|
||||
?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
|
||||
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) ->
|
||||
case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of
|
||||
true -> publish(Packet, State);
|
||||
false -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
|
||||
end,
|
||||
{ok, State};
|
||||
|
||||
|
@ -216,11 +216,14 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi
|
|||
process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
||||
send(?SUBACK_PACKET(PacketId, []), State);
|
||||
|
||||
process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{
|
||||
client_id = ClientId, username = Username, session = Session}) ->
|
||||
Client = client(State),
|
||||
TopicTable = parse_topic_table(RawTopicTable),
|
||||
AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable],
|
||||
%% TODO: refactor later...
|
||||
process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session,
|
||||
client_id = ClientId, username = Username, is_superuser = IsSuperuser}) ->
|
||||
Client = client(State), TopicTable = parse_topic_table(RawTopicTable),
|
||||
AllowDenies = if
|
||||
IsSuperuser -> [];
|
||||
true -> [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable]
|
||||
end,
|
||||
case lists:member(deny, AllowDenies) of
|
||||
true ->
|
||||
?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
|
||||
|
@ -297,6 +300,12 @@ trace(send, Packet, ProtoState) ->
|
|||
redeliver({?PUBREL, PacketId}, State) ->
|
||||
send(?PUBREL_PACKET(PacketId), State).
|
||||
|
||||
stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
|
||||
{stop, {shutdown, auth_failure}, State};
|
||||
|
||||
stop_if_auth_failure(_RC, State) ->
|
||||
{ok, State}.
|
||||
|
||||
shutdown(_Error, #proto_state{client_id = undefined}) ->
|
||||
ignore;
|
||||
|
||||
|
@ -430,6 +439,13 @@ parse_topic_table(TopicTable) ->
|
|||
parse_topics(Topics) ->
|
||||
[emqttd_topic:parse(Topic) || Topic <- Topics].
|
||||
|
||||
authenticate(Client, Password) ->
|
||||
case emqttd_access_control:auth(Client, Password) of
|
||||
ok -> {ok, false};
|
||||
{ok, IsSuper} -> {ok, IsSuper};
|
||||
{error, Error} -> {error, Error}
|
||||
end.
|
||||
|
||||
%% PUBLISH ACL is cached in process dictionary.
|
||||
check_acl(publish, Topic, Client) ->
|
||||
IfCache = emqttd:conf(cache_acl, true),
|
||||
|
|
|
@ -235,6 +235,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
|
|||
collect_interval = get_value(collect_interval, SessEnv, 0),
|
||||
timestamp = os:timestamp()},
|
||||
emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),
|
||||
emqttd:run_hooks('session.created', [ClientId, Username]),
|
||||
%% Start statistics
|
||||
{ok, start_collector(Session), hibernate}.
|
||||
|
||||
|
@ -519,7 +520,8 @@ handle_info(expired, Session) ->
|
|||
handle_info(Info, Session) ->
|
||||
?UNEXPECTED_INFO(Info, Session).
|
||||
|
||||
terminate(_Reason, #session{client_id = ClientId}) ->
|
||||
terminate(Reason, #session{client_id = ClientId, username = Username}) ->
|
||||
emqttd:run_hooks('session.terminated', [ClientId, Username, Reason]),
|
||||
emqttd:subscriber_down(ClientId),
|
||||
emqttd_sm:unreg_session(ClientId).
|
||||
|
||||
|
|
|
@ -37,7 +37,9 @@ all() ->
|
|||
{group, stats},
|
||||
{group, hook},
|
||||
{group, http},
|
||||
{group, cluster},
|
||||
%%{group, backend},
|
||||
{group, alarms},
|
||||
{group, cli}].
|
||||
|
||||
groups() ->
|
||||
|
@ -69,7 +71,19 @@ groups() ->
|
|||
{http, [sequence],
|
||||
[request_status,
|
||||
request_publish
|
||||
% websocket_test
|
||||
]},
|
||||
{cluster, [sequence],
|
||||
[cluster_test,
|
||||
cluster_join,
|
||||
cluster_leave,
|
||||
cluster_remove,
|
||||
cluster_remove2,
|
||||
cluster_node_down
|
||||
]},
|
||||
{alarms, [sequence],
|
||||
[set_alarms]
|
||||
},
|
||||
{cli, [sequence],
|
||||
[ctl_register_cmd,
|
||||
cli_status,
|
||||
|
@ -323,11 +337,16 @@ add_delete_hook(_) ->
|
|||
[] = emqttd_hook:lookup(emqttd_hook).
|
||||
|
||||
run_hooks(_) ->
|
||||
emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]),
|
||||
emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]),
|
||||
emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]),
|
||||
{stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []),
|
||||
{ok, []} = emqttd:run_hooks(unknown_hook, [], []).
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
|
||||
emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
|
||||
{stop, [r3, r2]} = emqttd:run_hooks(foldl_hook, [arg1, arg2], []),
|
||||
{ok, []} = emqttd:run_hooks(unknown_hook, [], []),
|
||||
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
|
||||
emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
|
||||
stop = emqttd:run_hooks(foreach_hook, [arg]).
|
||||
|
||||
hook_fun1([]) -> ok.
|
||||
hook_fun2([]) -> {ok, []}.
|
||||
|
@ -336,6 +355,10 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok.
|
|||
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
|
||||
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
|
||||
|
||||
hook_fun6(arg, initArg) -> ok.
|
||||
hook_fun7(arg, initArg) -> any.
|
||||
hook_fun8(arg, initArg) -> stop.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% HTTP Request Test
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -378,6 +401,102 @@ auth_header_(User, Pass) ->
|
|||
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
|
||||
{"Authorization","Basic " ++ Encoded}.
|
||||
|
||||
websocket_test(_) ->
|
||||
% Conn = esockd_connection:new(esockd_transport, nil, []),
|
||||
% Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1},
|
||||
% mochiweb_headers:make([{"Sec-WebSocket-Protocol","mqtt"},
|
||||
% {"Upgrade","websocket"}
|
||||
% ])),
|
||||
Req = "GET " ++ "/mqtt" ++" HTTP/1.1\r\nUpgrade: WebSocket\r\nConnection: Upgrade\r\n" ++
|
||||
"Host: " ++ "127.0.0.1"++ "\r\n" ++
|
||||
"Origin: http://" ++ "127.0.0.1" ++ "/\r\n\r\n",
|
||||
|
||||
ct:log("Req:~p", [Req]),
|
||||
emqttd_http:handle_request(Req).
|
||||
%%--------------------------------------------------------------------
|
||||
%% cluster group
|
||||
%%--------------------------------------------------------------------
|
||||
cluster_test(_Config) ->
|
||||
Z = slave(emqttd, cluster_test_z),
|
||||
wait_running(Z),
|
||||
true = emqttd:is_running(Z),
|
||||
Node = node(),
|
||||
ok = rpc:call(Z, emqttd_cluster, join, [Node]),
|
||||
[Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)),
|
||||
ct:log("Z:~p, Node:~p", [Z, Node]),
|
||||
ok = rpc:call(Z, emqttd_cluster, leave, []),
|
||||
[Node] = lists:sort(mnesia:system_info(running_db_nodes)),
|
||||
ok = slave:stop(Z).
|
||||
|
||||
cluster_join(_) ->
|
||||
Z = slave(emqttd, cluster_join_z),
|
||||
N = slave(node, cluster_join_n),
|
||||
wait_running(Z),
|
||||
true = emqttd:is_running(Z),
|
||||
Node = node(),
|
||||
{error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node),
|
||||
{error, {node_not_running, N}} = emqttd_cluster:join(N),
|
||||
ok = emqttd_cluster:join(Z),
|
||||
slave:stop(Z),
|
||||
slave:stop(N).
|
||||
|
||||
cluster_leave(_) ->
|
||||
Z = slave(emqttd, cluster_leave_z),
|
||||
wait_running(Z),
|
||||
{error, node_not_in_cluster} = emqttd_cluster:leave(),
|
||||
ok = emqttd_cluster:join(Z),
|
||||
Node = node(),
|
||||
[Z, Node] = emqttd_mnesia:running_nodes(),
|
||||
ok = emqttd_cluster:leave(),
|
||||
[Node] = emqttd_mnesia:running_nodes(),
|
||||
slave:stop(Z).
|
||||
|
||||
cluster_remove(_) ->
|
||||
Z = slave(emqttd, cluster_remove_z),
|
||||
wait_running(Z),
|
||||
Node = node(),
|
||||
{error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node),
|
||||
ok = emqttd_cluster:join(Z),
|
||||
[Z, Node] = emqttd_mnesia:running_nodes(),
|
||||
ok = emqttd_cluster:remove(Z),
|
||||
[Node] = emqttd_mnesia:running_nodes(),
|
||||
slave:stop(Z).
|
||||
|
||||
cluster_remove2(_) ->
|
||||
Z = slave(emqttd, cluster_remove2_z),
|
||||
wait_running(Z),
|
||||
ok = emqttd_cluster:join(Z),
|
||||
Node = node(),
|
||||
[Z, Node] = emqttd_mnesia:running_nodes(),
|
||||
ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []),
|
||||
ok = emqttd_cluster:remove(Z),
|
||||
[Node] = emqttd_mnesia:running_nodes(),
|
||||
slave:stop(Z).
|
||||
|
||||
cluster_node_down(_) ->
|
||||
Z = slave(emqttd, cluster_node_down),
|
||||
timer:sleep(1000),
|
||||
wait_running(Z),
|
||||
ok = emqttd_cluster:join(Z),
|
||||
ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]),
|
||||
ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]),
|
||||
Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
|
||||
ct:log("Routes: ~p~n", [Routes]),
|
||||
[<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes],
|
||||
slave:stop(Z),
|
||||
timer:sleep(1000),
|
||||
Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)).
|
||||
|
||||
set_alarms(_) ->
|
||||
AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
|
||||
emqttd_alarm:set_alarm(AlarmTest),
|
||||
Alarms = emqttd_alarm:get_alarms(),
|
||||
?assertEqual(1, length(Alarms)),
|
||||
emqttd_alarm:clear_alarm(<<"1">>),
|
||||
[] = emqttd_alarm:get_alarms().
|
||||
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cli group
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -451,3 +570,32 @@ cli_vm(_) ->
|
|||
emqttd_cli:vm([]),
|
||||
emqttd_cli:vm(["ports"]).
|
||||
|
||||
|
||||
ensure_ok(ok) -> ok;
|
||||
ensure_ok({error, {already_started, _}}) -> ok.
|
||||
|
||||
host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
||||
|
||||
wait_running(Node) ->
|
||||
wait_running(Node, 30000).
|
||||
|
||||
wait_running(Node, Timeout) when Timeout < 0 ->
|
||||
throw({wait_timeout, Node});
|
||||
|
||||
wait_running(Node, Timeout) ->
|
||||
case rpc:call(Node, emqttd, is_running, [Node]) of
|
||||
true -> ok;
|
||||
false -> timer:sleep(100),
|
||||
wait_running(Node, Timeout - 100)
|
||||
end.
|
||||
|
||||
slave(emqttd, Node) ->
|
||||
{ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
|
||||
rpc:call(Emq, application, ensure_all_started, [emqttd]),
|
||||
Emq;
|
||||
|
||||
slave(node, Node) ->
|
||||
{ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
|
||||
N.
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue