BIND 10 master, updated. 70066be6e51afc479dc485dfdf9ab79169a9d3c1 [master] Merge branch 'trac2823'
BIND 10 source code commits
bind10-changes at lists.isc.org
Tue May 7 20:25:42 UTC 2013
The branch, master has been updated
via 70066be6e51afc479dc485dfdf9ab79169a9d3c1 (commit)
via c630ff8a3861c5597a5c77a81eee52c09aaef643 (commit)
via 7ccddd64de7f7928d20f7eff23053b64c644349d (commit)
via 5bdee1a5056c8e3517d2dd3de954d40323027ee1 (commit)
via 034e1d3b1490054f265c4e4ac20cfa43127a5625 (commit)
via b308ba3bf71ccc768f302022e7ab571ff2cfbdfd (commit)
via 72604e2508ea78f0f0fcce74b6e34838e75675a2 (commit)
via ac3da26ee0505f828be081d284b4216f5ada820a (commit)
via 79d4b470181b50699125d1cf3394e11071f6178d (commit)
via b6de5342bd648c4f38e892df6c93177c243ec993 (commit)
via 0a89b921bb8cb268721e8ca38d99f7ea967fdf8c (commit)
via 5ee122cfa69f8bd6be3c4861407a3da35cb72bdb (commit)
via 5ee0c3b143458e52d59e3e162c5d9cd4e11e0d1f (commit)
via 2a8863be72c4fdf0b9aba1e05e525de7fe18d4b4 (commit)
via 861a1055e8d48f682b5aed9942f1eb1963d93a55 (commit)
via a6328cdc2907b9becde3d261373521dc1a3b963f (commit)
via 1c295702731e5fb76d8ea39c14104347fb0a8b91 (commit)
via 35df226d16838f37390270d9d3d1a5e736cfcd12 (commit)
via 2aaa162fd5f5921aac58baeeb0dd026b7a0a2fbb (commit)
via db5ddcf7b62eb21d9cf0f057ee45ec31adc832f3 (commit)
via 125dd1132b09275fbb528ebdd4358f5977f17402 (commit)
via f82f47b299e9f48c86c6c8f7967f29c381d5df06 (commit)
via ca3018c0273b41dac1f67892cdf59e3673a183f5 (commit)
via 8250e0313cf3e7f81646e8dd978f4a16f0578222 (commit)
from 45e7d86237a1b483d27819c5f5fe0d5c6ebb6dc4 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit 70066be6e51afc479dc485dfdf9ab79169a9d3c1
Merge: 45e7d86 c630ff8
Author: JINMEI Tatuya <jinmei at isc.org>
Date: Tue May 7 13:25:37 2013 -0700
[master] Merge branch 'trac2823'
-----------------------------------------------------------------------
Summary of changes:
src/bin/stats/tests/Makefile.am | 2 +-
...b10-stats-httpd_test.py => stats-httpd_test.py} | 229 +++++++-----
.../tests/{b10-stats_test.py => stats_test.py} | 343 ++++++++++--------
src/bin/stats/tests/test_utils.py | 363 +++++++-------------
4 files changed, 451 insertions(+), 486 deletions(-)
rename src/bin/stats/tests/{b10-stats-httpd_test.py => stats-httpd_test.py} (87%)
rename src/bin/stats/tests/{b10-stats_test.py => stats_test.py} (88%)
-----------------------------------------------------------------------
diff --git a/src/bin/stats/tests/Makefile.am b/src/bin/stats/tests/Makefile.am
index a5ff4e5..06d11a1 100644
--- a/src/bin/stats/tests/Makefile.am
+++ b/src/bin/stats/tests/Makefile.am
@@ -1,7 +1,7 @@
SUBDIRS = testdata .
PYCOVERAGE_RUN = @PYCOVERAGE_RUN@
-PYTESTS = b10-stats_test.py b10-stats-httpd_test.py
+PYTESTS = stats_test.py stats-httpd_test.py
EXTRA_DIST = $(PYTESTS) test_utils.py
CLEANFILES = test_utils.pyc
diff --git a/src/bin/stats/tests/b10-stats-httpd_test.py b/src/bin/stats/tests/b10-stats-httpd_test.py
deleted file mode 100644
index 9a463ab..0000000
--- a/src/bin/stats/tests/b10-stats-httpd_test.py
+++ /dev/null
@@ -1,1082 +0,0 @@
-# Copyright (C) 2011-2012 Internet Systems Consortium.
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notice and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
-# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
-# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
-# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
-# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-"""
-In each of these tests we start several virtual components. They are
-not the real components, no external processes are started. They are
-just simple mock objects running each in its own thread and pretending
-to be bind10 modules. This helps testing the stats http server in a
-close to real environment.
-"""
-
-import unittest
-import os
-import imp
-import socket
-import errno
-import select
-import string
-import time
-import threading
-import http.client
-import xml.etree.ElementTree
-import random
-import urllib.parse
-import sys
-# load this module for xml validation with xsd. For this test, an
-# installation of lxml is required in advance. See http://lxml.de/.
-try:
- from lxml import etree as lxml_etree
-except ImportError:
- lxml_etree = None
-
-import isc
-import isc.log
-import stats_httpd
-import stats
-from test_utils import BaseModules, ThreadingServerManager, MyStats,\
- MyStatsHttpd, SignalHandler,\
- send_command, CONST_BASETIME
-from isc.testutils.ccsession_mock import MockModuleCCSession
-
-# This test suite uses xml.etree.ElementTree.XMLParser via
-# xml.etree.ElementTree.parse. On the platform where expat isn't
-# installed, ImportError is raised and it's failed. Check expat is
-# available before the test invocation. Skip this test if it's
-# unavailable.
-try:
- # ImportError raised if xpat is unavailable
- xml_parser = xml.etree.ElementTree.XMLParser()
-except ImportError:
- xml_parser = None
-
-# set XML Namespaces for testing
-XMLNS_XSL = "http://www.w3.org/1999/XSL/Transform"
-XMLNS_XHTML = "http://www.w3.org/1999/xhtml"
-XMLNS_XSD = "http://www.w3.org/2001/XMLSchema"
-XMLNS_XSI = stats_httpd.XMLNS_XSI
-
-DUMMY_DATA = {
- 'Init' : {
- "boot_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME)
- },
- 'Auth' : {
- "queries.tcp": 6,
- "queries.udp": 4,
- "queries.perzone": [{
- "zonename": "test1.example",
- "queries.tcp": 10,
- "queries.udp": 8
- }, {
- "zonename": "test2.example",
- "queries.tcp": 8,
- "queries.udp": 6
- }],
- "nds_queries.perzone": {
- "test10.example": {
- "queries.tcp": 10,
- "queries.udp": 8
- },
- "test20.example": {
- "queries.tcp": 8,
- "queries.udp": 6
- }
- }
- },
- 'Stats' : {
- "report_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
- "boot_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
- "last_update_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
- "lname": "4d70d40a_c at host",
- "timestamp": time.mktime(CONST_BASETIME)
- }
- }
-
-def get_availaddr(address='127.0.0.1', port=8001):
- """returns a tuple of address and port which is available to
- listen on the platform. The first argument is a address for
- search. The second argument is a port for search. If a set of
- address and port is failed on the search for the availability, the
- port number is increased and it goes on the next trial until the
- available set of address and port is looked up. If the port number
- reaches over 65535, it may stop the search and raise a
- OverflowError exception."""
- while True:
- for addr in socket.getaddrinfo(
- address, port, 0,
- socket.SOCK_STREAM, socket.IPPROTO_TCP):
- sock = socket.socket(addr[0], socket.SOCK_STREAM)
- try:
- sock.bind((address, port))
- return (address, port)
- except socket.error:
- continue
- finally:
- if sock: sock.close()
- # This address and port number are already in use.
- # next port number is added
- port = port + 1
-
-def is_ipv6_enabled(address='::1', port=8001):
- """checks IPv6 enabled on the platform. address for check is '::1'
- and port for check is random number between 8001 and
- 65535. Retrying is 3 times even if it fails. The built-in socket
- module provides a 'has_ipv6' parameter, but it's not used here
- because there may be a situation where the value is True on an
- environment where the IPv6 config is disabled."""
- for p in random.sample(range(port, 65535), 3):
- try:
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- sock.bind((address, p))
- return True
- except socket.error:
- continue
- finally:
- if sock: sock.close()
- return False
-
-class TestItemNameList(unittest.TestCase):
-
- def test_item_name_list(self):
- # for a one-element list
- self.assertEqual(['a'],
- stats_httpd.item_name_list({'a':1}, 'a'))
- # for a dict under a dict
- self.assertEqual(['a','a/b'],
- stats_httpd.item_name_list({'a':{'b':1}}, 'a'))
- self.assertEqual(['a/b'],
- stats_httpd.item_name_list({'a':{'b':1}}, 'a/b'))
- self.assertEqual(['a','a/b','a/b/c'],
- stats_httpd.item_name_list({'a':{'b':{'c':1}}}, 'a'))
- self.assertEqual(['a/b','a/b/c'],
- stats_httpd.item_name_list({'a':{'b':{'c':1}}},
- 'a/b'))
- self.assertEqual(['a/b/c'],
- stats_httpd.item_name_list({'a':{'b':{'c':1}}},
- 'a/b/c'))
- # for a list under a dict
- self.assertEqual(['a[2]'],
- stats_httpd.item_name_list({'a':[1,2,3]}, 'a[2]'))
- self.assertEqual(['a', 'a[0]', 'a[1]', 'a[2]'],
- stats_httpd.item_name_list({'a':[1,2,3]}, 'a'))
- self.assertEqual(['a', 'a[0]', 'a[1]', 'a[2]'],
- stats_httpd.item_name_list({'a':[1,2,3]}, ''))
- # for a list under a dict under a dict
- self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
- stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, 'a'))
- self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
- stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, ''))
- self.assertEqual(['a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
- stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, 'a/b'))
- # for a mixed case of the above
- self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]', 'a/c'],
- stats_httpd.item_name_list(
- {'a':{'b':[1,2,3], 'c':1}}, 'a'))
- self.assertEqual(['a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
- stats_httpd.item_name_list(
- {'a':{'b':[1,2,3], 'c':1}}, 'a/b'))
- self.assertEqual(['a/c'],
- stats_httpd.item_name_list(
- {'a':{'b':[1,2,3], 'c':1}}, 'a/c'))
- # for specifying a wrong identifier which is not found in
- # element
- self.assertRaises(isc.cc.data.DataNotFoundError,
- stats_httpd.item_name_list, {'x':1}, 'a')
- # for specifying a string in element and an empty string in
- # identifier
- self.assertEqual([],
- stats_httpd.item_name_list('a', ''))
- # for specifying empty strings in element and identifier
- self.assertEqual([],
- stats_httpd.item_name_list('', ''))
- # for specifying wrong element, which is an non-empty string,
- # and an non-empty string in identifier
- self.assertRaises(isc.cc.data.DataTypeError,
- stats_httpd.item_name_list, 'a', 'a')
- # for specifying None in element and identifier
- self.assertRaises(isc.cc.data.DataTypeError,
- stats_httpd.item_name_list, None, None)
- # for specifying non-dict in element
- self.assertRaises(isc.cc.data.DataTypeError,
- stats_httpd.item_name_list, [1,2,3], 'a')
- self.assertRaises(isc.cc.data.DataTypeError,
- stats_httpd.item_name_list, [1,2,3], '')
- # for checking key names sorted which consist of element
- num = 11
- keys = [ 'a', 'aa', 'b' ]
- keys.sort(reverse=True)
- dictlist = dict([ (k, list(range(num))) for k in keys ])
- keys.sort()
- ans = []
- for k in keys:
- ans += [k] + [ '%s[%d]' % (k, i) for i in range(num) ]
- self.assertEqual(ans,
- stats_httpd.item_name_list(dictlist, ''))
-
-class TestHttpHandler(unittest.TestCase):
- """Tests for HttpHandler class"""
- def setUp(self):
- # set the signal handler for deadlock
- self.sig_handler = SignalHandler(self.fail)
- self.base = BaseModules()
- self.stats_server = ThreadingServerManager(MyStats)
- self.stats = self.stats_server.server
- DUMMY_DATA['Stats']['lname'] = self.stats.cc_session.lname
- self.stats_server.run()
- (self.address, self.port) = get_availaddr()
- self.stats_httpd_server = ThreadingServerManager(MyStatsHttpd, (self.address, self.port))
- self.stats_httpd = self.stats_httpd_server.server
- self.stats_httpd_server.run()
- self.client = http.client.HTTPConnection(self.address, self.port)
- self.client._http_vsn_str = 'HTTP/1.0\n'
- self.client.connect()
-
- def tearDown(self):
- self.client.close()
- self.stats_httpd_server.shutdown()
- self.stats_server.shutdown()
- self.base.shutdown()
- # reset the signal handler
- self.sig_handler.reset()
-
- @unittest.skipIf(sys.version_info >= (3, 3), "Unsupported in Python 3.3 or higher")
- @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
- def test_do_GET(self):
- self.assertTrue(type(self.stats_httpd.httpd) is list)
- self.assertEqual(len(self.stats_httpd.httpd), 1)
- self.assertEqual((self.address, self.port), self.stats_httpd.http_addrs[0])
-
- def check_XML_URL_PATH(path=''):
- url_path = '%s/%s' % (stats_httpd.XML_URL_PATH, path)
- url_path = urllib.parse.quote(url_path)
- self.client.putrequest('GET', url_path)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.getheader("Content-type"), "text/xml")
- self.assertGreater(int(response.getheader("Content-Length")), 0)
- self.assertEqual(response.status, 200)
- xml_doctype = response.readline().decode()
- xsl_doctype = response.readline().decode()
- self.assertGreater(len(xml_doctype), 0)
- self.assertGreater(len(xsl_doctype), 0)
- root = xml.etree.ElementTree.parse(response).getroot()
- self.assertGreater(root.tag.find('statistics'), 0)
- schema_loc = '{%s}schemaLocation' % XMLNS_XSI
- # check the path of XSD
- self.assertEqual(root.attrib[schema_loc],
- stats_httpd.XSD_NAMESPACE + ' '
- + stats_httpd.XSD_URL_PATH)
- # check the path of XSL
- self.assertTrue(xsl_doctype.startswith(
- '<?xml-stylesheet type="text/xsl" href="' +
- stats_httpd.XSL_URL_PATH
- + '"?>'))
- # check whether the list of 'identifier' attributes in
- # root is same as the list of item names in DUMMY_DATA
- id_list = [ elm.attrib['identifier'] for elm in root ]
- item_list = [ it for it in \
- stats_httpd.item_name_list(DUMMY_DATA, path) \
- if len(it.split('/')) > 1 ]
- self.assertEqual(id_list, item_list)
- for elem in root:
- attr = elem.attrib
- value = isc.cc.data.find(DUMMY_DATA, attr['identifier'])
- # No 'value' attribute should be found in the 'item'
- # element when datatype of the value is list or dict.
- if type(value) is list or type(value) is dict:
- self.assertFalse('value' in attr)
- # The value of the 'value' attribute should be checked
- # after casting it to string type if datatype of the
- # value is int or float. Because attr['value'] returns
- # string type even if its value is int or float.
- elif type(value) is int or type(value) is float:
- self.assertEqual(attr['value'], str(value))
- else:
- self.assertEqual(attr['value'], value)
-
- # URL is '/bind10/statistics/xml'
- check_XML_URL_PATH()
- for path in stats_httpd.item_name_list(DUMMY_DATA, ''):
- check_XML_URL_PATH(path)
-
- def check_XSD_URL_PATH():
- url_path = stats_httpd.XSD_URL_PATH
- url_path = urllib.parse.quote(url_path)
- self.client.putrequest('GET', url_path)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.getheader("Content-type"), "text/xml")
- self.assertGreater(int(response.getheader("Content-Length")), 0)
- self.assertEqual(response.status, 200)
- root = xml.etree.ElementTree.parse(response).getroot()
- url_xmlschema = '{%s}' % XMLNS_XSD
- self.assertGreater(root.tag.find('schema'), 0)
- self.assertTrue(hasattr(root, 'attrib'))
- self.assertTrue('targetNamespace' in root.attrib)
- self.assertEqual(root.attrib['targetNamespace'],
- stats_httpd.XSD_NAMESPACE)
-
- # URL is '/bind10/statistics/xsd'
- check_XSD_URL_PATH()
-
- def check_XSL_URL_PATH():
- url_path = stats_httpd.XSL_URL_PATH
- url_path = urllib.parse.quote(url_path)
- self.client.putrequest('GET', url_path)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.getheader("Content-type"), "text/xml")
- self.assertGreater(int(response.getheader("Content-Length")), 0)
- self.assertEqual(response.status, 200)
- root = xml.etree.ElementTree.parse(response).getroot()
- url_trans = '{%s}' % XMLNS_XSL
- url_xhtml = '{%s}' % XMLNS_XHTML
- self.assertEqual(root.tag, url_trans + 'stylesheet')
-
- # URL is '/bind10/statistics/xsl'
- check_XSL_URL_PATH()
-
- # 302 redirect
- self.client._http_vsn_str = 'HTTP/1.1'
- self.client.putrequest('GET', '/')
- self.client.putheader('Host', self.address)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 302)
- self.assertEqual(response.getheader('Location'),
- "http://%s:%d%s/" % (self.address, self.port, stats_httpd.XML_URL_PATH))
-
- # 404 NotFound (random path)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', '/path/to/foo/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', '/bind10/foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', '/bind10/statistics/foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + 'Auth') # with no slash
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- # 200 ok
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/#foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/?foo=bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- # 404 NotFound (too long path)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Init/boot_time/a')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- # 404 NotFound (nonexistent module name)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Foo')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- # 404 NotFound (nonexistent item name)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Foo/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Foo/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Foo/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- # 404 NotFound (existent module but nonexistent item name)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Auth/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Auth/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
- self.client._http_vsn_str = 'HTTP/1.0'
- self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Auth/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- def test_do_GET_failed1(self):
- # checks status
- self.assertEqual(send_command("status", "Stats"),
- (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
- # failure case(Stats is down)
- self.assertTrue(self.stats.running)
- self.assertEqual(send_command("shutdown", "Stats"),
- (0, None)) # Stats is down
- self.assertFalse(self.stats.running)
- self.stats_httpd.cc_session.set_timeout(milliseconds=100)
-
- # request XML
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 500)
-
- # request XSD
- self.client.putrequest('GET', stats_httpd.XSD_URL_PATH)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- # request XSL
- self.client.putrequest('GET', stats_httpd.XSL_URL_PATH)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- def test_do_GET_failed2(self):
- # failure case(Stats replies an error)
- self.stats.mccs.set_command_handler(
- lambda cmd, args: \
- isc.config.ccsession.create_answer(1, "specified arguments are incorrect: I have an error.")
- )
-
- # request XML
- self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- # request XSD
- self.client.putrequest('GET', stats_httpd.XSD_URL_PATH)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- # request XSL
- self.client.putrequest('GET', stats_httpd.XSL_URL_PATH)
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- def test_do_HEAD(self):
- self.client.putrequest('HEAD', stats_httpd.XML_URL_PATH + '/')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 200)
-
- self.client.putrequest('HEAD', '/path/to/foo/bar')
- self.client.endheaders()
- response = self.client.getresponse()
- self.assertEqual(response.status, 404)
-
- @unittest.skipUnless(lxml_etree, "skipping XML validation with XSD")
- def test_xml_validation_with_xsd(self):
- """Tests for XML validation with XSD. If lxml is not
- installed, this tests would be skipped."""
- def request_xsd():
- url_path = stats_httpd.XSD_URL_PATH
- url_path = urllib.parse.quote(url_path)
- self.client.putrequest('GET', url_path)
- self.client.endheaders()
- xsd_doc = self.client.getresponse()
- xsd_doc = lxml_etree.parse(xsd_doc)
- return lxml_etree.XMLSchema(xsd_doc)
-
- def request_xmldoc(path=''):
- url_path = '%s/%s' % (stats_httpd.XML_URL_PATH, path)
- url_path = urllib.parse.quote(url_path)
- self.client.putrequest('GET', url_path)
- self.client.endheaders()
- xml_doc = self.client.getresponse()
- return lxml_etree.parse(xml_doc)
-
- # request XSD and XML
- xsd = request_xsd()
- xml_doc = request_xmldoc()
- # do validation
- self.assertTrue(xsd.validate(xml_doc))
-
- # validate each paths in DUMMY_DATA
- for path in stats_httpd.item_name_list(DUMMY_DATA, ''):
- # request XML
- xml_doc = request_xmldoc(path)
- # do validation
- self.assertTrue(xsd.validate(xml_doc))
-
-class TestHttpServerError(unittest.TestCase):
- """Tests for HttpServerError exception"""
- def test_raises(self):
- try:
- raise stats_httpd.HttpServerError('Nothing')
- except stats_httpd.HttpServerError as err:
- self.assertEqual(str(err), 'Nothing')
-
-class TestHttpServer(unittest.TestCase):
- """Tests for HttpServer class"""
- def setUp(self):
- # set the signal handler for deadlock
- self.sig_handler = SignalHandler(self.fail)
- self.base = BaseModules()
-
- def tearDown(self):
- if hasattr(self, "stats_httpd"):
- self.stats_httpd.stop()
- self.base.shutdown()
- # reset the signal handler
- self.sig_handler.reset()
-
- def test_httpserver(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.assertEqual(type(self.stats_httpd.httpd), list)
- self.assertEqual(len(self.stats_httpd.httpd), 1)
- for httpd in self.stats_httpd.httpd:
- self.assertTrue(isinstance(httpd, stats_httpd.HttpServer))
-
-class TestStatsHttpdError(unittest.TestCase):
- """Tests for StatsHttpdError exception"""
-
- def test_raises1(self):
- try:
- raise stats_httpd.StatsHttpdError('Nothing')
- except stats_httpd.StatsHttpdError as err:
- self.assertEqual(str(err), 'Nothing')
-
- def test_raises2(self):
- try:
- raise stats_httpd.StatsHttpdDataError('Nothing')
- except stats_httpd.StatsHttpdDataError as err:
- self.assertEqual(str(err), 'Nothing')
-
-class TestStatsHttpd(unittest.TestCase):
- """Tests for StatsHttpd class"""
-
- def setUp(self):
- # set the signal handler for deadlock
- self.sig_handler = SignalHandler(self.fail)
- self.base = BaseModules()
- self.stats_server = ThreadingServerManager(MyStats)
- self.stats_server.run()
- # checking IPv6 enabled on this platform
- self.ipv6_enabled = is_ipv6_enabled()
- # instantiation of StatsHttpd indirectly calls gethostbyaddr(), which
- # can block for an uncontrollable period, leading many undesirable
- # results. We should rather eliminate the reliance, but until we
- # can make such fundamental cleanup we replace it with a faked method;
- # in our test scenario the return value doesn't matter.
- self.__gethostbyaddr_orig = socket.gethostbyaddr
- socket.gethostbyaddr = lambda x: ('test.example.', [], None)
-
- def tearDown(self):
- socket.gethostbyaddr = self.__gethostbyaddr_orig
- if hasattr(self, "stats_httpd"):
- self.stats_httpd.stop()
- self.stats_server.shutdown()
- self.base.shutdown()
- # reset the signal handler
- self.sig_handler.reset()
-
- def test_init(self):
- server_address = get_availaddr()
- self.stats_httpd = MyStatsHttpd(server_address)
- self.assertEqual(self.stats_httpd.running, False)
- self.assertEqual(self.stats_httpd.poll_intval, 0.5)
- self.assertNotEqual(len(self.stats_httpd.httpd), 0)
- self.assertEqual(type(self.stats_httpd.mccs), isc.config.ModuleCCSession)
- self.assertEqual(type(self.stats_httpd.cc_session), isc.cc.Session)
- self.assertEqual(len(self.stats_httpd.config), 2)
- self.assertTrue('listen_on' in self.stats_httpd.config)
- self.assertEqual(len(self.stats_httpd.config['listen_on']), 1)
- self.assertTrue('address' in self.stats_httpd.config['listen_on'][0])
- self.assertTrue('port' in self.stats_httpd.config['listen_on'][0])
- self.assertTrue(server_address in set(self.stats_httpd.http_addrs))
- ans = send_command(
- isc.config.ccsession.COMMAND_GET_MODULE_SPEC,
- "ConfigManager", {"module_name":"StatsHttpd"})
- # assert StatsHttpd is added to ConfigManager
- self.assertNotEqual(ans, (0,{}))
- self.assertTrue(ans[1]['module_name'], 'StatsHttpd')
-
- def test_init_hterr(self):
- orig_open_httpd = stats_httpd.StatsHttpd.open_httpd
- def err_open_httpd(arg): raise stats_httpd.HttpServerError
- stats_httpd.StatsHttpd.open_httpd = err_open_httpd
- self.assertRaises(stats_httpd.HttpServerError, stats_httpd.StatsHttpd)
- ans = send_command(
- isc.config.ccsession.COMMAND_GET_MODULE_SPEC,
- "ConfigManager", {"module_name":"StatsHttpd"})
- # assert StatsHttpd is removed from ConfigManager
- self.assertEqual(ans, (0,{}))
- stats_httpd.StatsHttpd.open_httpd = orig_open_httpd
-
- def test_openclose_mccs(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- mccs = MockModuleCCSession()
- self.stats_httpd.mccs = mccs
- self.assertFalse(self.stats_httpd.mccs.stopped)
- self.assertFalse(self.stats_httpd.mccs.closed)
- self.stats_httpd.close_mccs()
- self.assertTrue(mccs.stopped)
- self.assertTrue(mccs.closed)
- self.assertEqual(self.stats_httpd.mccs, None)
- self.stats_httpd.open_mccs()
- self.assertIsNotNone(self.stats_httpd.mccs)
- self.stats_httpd.mccs = None
- self.assertEqual(self.stats_httpd.mccs, None)
- self.assertEqual(self.stats_httpd.close_mccs(), None)
-
- def test_mccs(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.assertIsNotNone(self.stats_httpd.mccs.get_socket())
- self.assertTrue(
- isinstance(self.stats_httpd.mccs.get_socket(), socket.socket))
- self.assertTrue(
- isinstance(self.stats_httpd.cc_session, isc.cc.session.Session))
- statistics_spec = self.stats_httpd.get_stats_spec()
- for mod in DUMMY_DATA:
- self.assertTrue(mod in statistics_spec)
- for cfg in statistics_spec[mod]:
- self.assertTrue('item_name' in cfg)
- self.assertTrue(cfg['item_name'] in DUMMY_DATA[mod])
- self.assertTrue(len(statistics_spec[mod]), len(DUMMY_DATA[mod]))
- self.stats_httpd.close_mccs()
- self.assertIsNone(self.stats_httpd.mccs)
-
- def test_httpd(self):
- # dual stack (addresses is ipv4 and ipv6)
- if self.ipv6_enabled:
- server_addresses = (get_availaddr('::1'), get_availaddr())
- self.stats_httpd = MyStatsHttpd(*server_addresses)
- for ht in self.stats_httpd.httpd:
- self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
- self.assertTrue(ht.address_family in set([socket.AF_INET, socket.AF_INET6]))
- self.assertTrue(isinstance(ht.socket, socket.socket))
-
- # dual stack (address is ipv6)
- if self.ipv6_enabled:
- server_addresses = get_availaddr('::1')
- self.stats_httpd = MyStatsHttpd(server_addresses)
- for ht in self.stats_httpd.httpd:
- self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
- self.assertEqual(ht.address_family, socket.AF_INET6)
- self.assertTrue(isinstance(ht.socket, socket.socket))
-
- # dual/single stack (address is ipv4)
- server_addresses = get_availaddr()
- self.stats_httpd = MyStatsHttpd(server_addresses)
- for ht in self.stats_httpd.httpd:
- self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
- self.assertEqual(ht.address_family, socket.AF_INET)
- self.assertTrue(isinstance(ht.socket, socket.socket))
-
- def test_httpd_anyIPv4(self):
- # any address (IPv4)
- server_addresses = get_availaddr(address='0.0.0.0')
- self.stats_httpd = MyStatsHttpd(server_addresses)
- for ht in self.stats_httpd.httpd:
- self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
- self.assertEqual(ht.address_family,socket.AF_INET)
- self.assertTrue(isinstance(ht.socket, socket.socket))
-
- def test_httpd_anyIPv6(self):
- # any address (IPv6)
- if self.ipv6_enabled:
- server_addresses = get_availaddr(address='::')
- self.stats_httpd = MyStatsHttpd(server_addresses)
- for ht in self.stats_httpd.httpd:
- self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
- self.assertEqual(ht.address_family,socket.AF_INET6)
- self.assertTrue(isinstance(ht.socket, socket.socket))
-
- def test_httpd_failed(self):
- # existent hostname
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
- get_availaddr(address='localhost'))
-
- # nonexistent hostname
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd, ('my.host.domain', 8000))
-
- # over flow of port number
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd, ('127.0.0.1', 80000))
-
- # negative
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd, ('127.0.0.1', -8000))
-
- # alphabet
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd, ('127.0.0.1', 'ABCDE'))
-
- # Address already in use
- server_addresses = get_availaddr()
- self.stats_httpd_server = ThreadingServerManager(MyStatsHttpd, server_addresses)
- self.stats_httpd_server.run()
- self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd, server_addresses)
- send_command("shutdown", "StatsHttpd")
-
- def test_running(self):
- self.stats_httpd_server = ThreadingServerManager(MyStatsHttpd, get_availaddr())
- self.stats_httpd = self.stats_httpd_server.server
- self.assertFalse(self.stats_httpd.running)
- self.stats_httpd_server.run()
- self.assertEqual(send_command("status", "StatsHttpd"),
- (0, "Stats Httpd is up. (PID " + str(os.getpid()) + ")"))
- self.assertTrue(self.stats_httpd.running)
- self.assertEqual(send_command("shutdown", "StatsHttpd"), (0, None))
- self.assertFalse(self.stats_httpd.running)
- self.stats_httpd_server.shutdown()
-
- # failure case
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.stats_httpd.cc_session.close()
- self.assertRaises(ValueError, self.stats_httpd.start)
-
- def test_failure_with_a_select_error (self):
- """checks select.error is raised if the exception except
- errno.EINTR is raised while it's selecting"""
- def raise_select_except(*args):
- raise select.error('dummy error')
- orig_select = stats_httpd.select.select
- stats_httpd.select.select = raise_select_except
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.assertRaises(select.error, self.stats_httpd.start)
- stats_httpd.select.select = orig_select
-
- def test_nofailure_with_errno_EINTR(self):
- """checks no exception is raised if errno.EINTR is raised
- while it's selecting"""
- def raise_select_except(*args):
- raise select.error(errno.EINTR)
- orig_select = stats_httpd.select.select
- stats_httpd.select.select = raise_select_except
- self.stats_httpd_server = ThreadingServerManager(MyStatsHttpd, get_availaddr())
- self.stats_httpd_server.run()
- self.stats_httpd_server.shutdown()
- stats_httpd.select.select = orig_select
-
- def test_open_template(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- # successful conditions
- tmpl = self.stats_httpd.open_template(stats_httpd.XML_TEMPLATE_LOCATION)
- self.assertTrue(isinstance(tmpl, string.Template))
- opts = dict(
- xml_string="<dummy></dummy>",
- xsl_url_path="/path/to/")
- lines = tmpl.substitute(opts)
- for n in opts:
- self.assertGreater(lines.find(opts[n]), 0)
- tmpl = self.stats_httpd.open_template(stats_httpd.XSD_TEMPLATE_LOCATION)
- self.assertTrue(isinstance(tmpl, string.Template))
- opts = dict(xsd_namespace="http://host/path/to/")
- lines = tmpl.substitute(opts)
- for n in opts:
- self.assertGreater(lines.find(opts[n]), 0)
- tmpl = self.stats_httpd.open_template(stats_httpd.XSL_TEMPLATE_LOCATION)
- self.assertTrue(isinstance(tmpl, string.Template))
- opts = dict(xsd_namespace="http://host/path/to/")
- lines = tmpl.substitute(opts)
- for n in opts:
- self.assertGreater(lines.find(opts[n]), 0)
- # unsuccessful condition
- self.assertRaises(
- stats_httpd.StatsHttpdDataError,
- self.stats_httpd.open_template, '/path/to/foo/bar')
-
- def test_commands(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.assertEqual(self.stats_httpd.command_handler("status", None),
- isc.config.ccsession.create_answer(
- 0, "Stats Httpd is up. (PID " + str(os.getpid()) + ")"))
- self.stats_httpd.running = True
- self.assertEqual(self.stats_httpd.command_handler("shutdown", None),
- isc.config.ccsession.create_answer(0))
- self.assertFalse(self.stats_httpd.running)
- self.assertEqual(
- self.stats_httpd.command_handler("__UNKNOWN_COMMAND__", None),
- isc.config.ccsession.create_answer(
- 1, "Unknown command: __UNKNOWN_COMMAND__"))
-
- def test_config(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- self.assertEqual(
- self.stats_httpd.config_handler(dict(_UNKNOWN_KEY_=None)),
- isc.config.ccsession.create_answer(
- 1, "unknown item _UNKNOWN_KEY_"))
-
- addresses = get_availaddr()
- self.assertEqual(
- self.stats_httpd.config_handler(
- dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
- isc.config.ccsession.create_answer(0))
- self.assertTrue("listen_on" in self.stats_httpd.config)
- for addr in self.stats_httpd.config["listen_on"]:
- self.assertTrue("address" in addr)
- self.assertTrue("port" in addr)
- self.assertTrue(addr["address"] == addresses[0])
- self.assertTrue(addr["port"] == addresses[1])
-
- if self.ipv6_enabled:
- addresses = get_availaddr("::1")
- self.assertEqual(
- self.stats_httpd.config_handler(
- dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
- isc.config.ccsession.create_answer(0))
- self.assertTrue("listen_on" in self.stats_httpd.config)
- for addr in self.stats_httpd.config["listen_on"]:
- self.assertTrue("address" in addr)
- self.assertTrue("port" in addr)
- self.assertTrue(addr["address"] == addresses[0])
- self.assertTrue(addr["port"] == addresses[1])
-
- addresses = get_availaddr()
- self.assertEqual(
- self.stats_httpd.config_handler(
- dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
- isc.config.ccsession.create_answer(0))
- self.assertTrue("listen_on" in self.stats_httpd.config)
- for addr in self.stats_httpd.config["listen_on"]:
- self.assertTrue("address" in addr)
- self.assertTrue("port" in addr)
- self.assertTrue(addr["address"] == addresses[0])
- self.assertTrue(addr["port"] == addresses[1])
- (ret, arg) = isc.config.ccsession.parse_answer(
- self.stats_httpd.config_handler(
- dict(listen_on=[dict(address="1.2.3.4",port=543210)]))
- )
- self.assertEqual(ret, 1)
-
- @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
- def test_xml_handler(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- module_name = 'Dummy'
- stats_spec = \
- { module_name :
- [{
- "item_name": "foo",
- "item_type": "string",
- "item_optional": False,
- "item_default": "bar",
- "item_description": "foo is bar",
- "item_title": "Foo"
- },
- {
- "item_name": "foo2",
- "item_type": "list",
- "item_optional": False,
- "item_default": [
- {
- "zonename" : "test1",
- "queries.udp" : 1,
- "queries.tcp" : 2
- },
- {
- "zonename" : "test2",
- "queries.udp" : 3,
- "queries.tcp" : 4
- }
- ],
- "item_title": "Foo bar",
- "item_description": "Foo bar",
- "list_item_spec": {
- "item_name": "foo2-1",
- "item_type": "map",
- "item_optional": False,
- "item_default": {},
- "map_item_spec": [
- {
- "item_name": "foo2-1-1",
- "item_type": "string",
- "item_optional": False,
- "item_default": "",
- "item_title": "Foo2 1 1",
- "item_description": "Foo bar"
- },
- {
- "item_name": "foo2-1-2",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Foo2 1 2",
- "item_description": "Foo bar"
- },
- {
- "item_name": "foo2-1-3",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Foo2 1 3",
- "item_description": "Foo bar"
- }
- ]
- }
- }]
- }
- stats_data = \
- { module_name : { 'foo':'bar',
- 'foo2': [
- {
- "foo2-1-1" : "bar1",
- "foo2-1-2" : 10,
- "foo2-1-3" : 9
- },
- {
- "foo2-1-1" : "bar2",
- "foo2-1-2" : 8,
- "foo2-1-3" : 7
- }
- ] } }
- self.stats_httpd.get_stats_spec = lambda x,y: stats_spec
- self.stats_httpd.get_stats_data = lambda x,y: stats_data
- xml_string = self.stats_httpd.xml_handler()
- stats_xml = xml.etree.ElementTree.fromstring(xml_string)
- schema_loc = '{%s}schemaLocation' % XMLNS_XSI
- self.assertEqual(stats_xml.attrib[schema_loc],
- stats_httpd.XML_ROOT_ATTRIB['xsi:schemaLocation'])
- stats_data = stats_data[module_name]
- stats_spec = stats_spec[module_name]
- names = stats_httpd.item_name_list(stats_data, '')
- for i in range(0, len(names)):
- self.assertEqual('%s/%s' % (module_name, names[i]), stats_xml[i].attrib['identifier'])
- value = isc.cc.data.find(stats_data, names[i])
- if type(value) is int:
- value = str(value)
- if type(value) is dict or type(value) is list:
- self.assertFalse('value' in stats_xml[i].attrib)
- else:
- self.assertEqual(value, stats_xml[i].attrib['value'])
- self.assertEqual(module_name, stats_xml[i].attrib['owner'])
- self.assertEqual(urllib.parse.quote('%s/%s/%s' % (stats_httpd.XML_URL_PATH,
- module_name, names[i])),
- stats_xml[i].attrib['uri'])
- spec = isc.config.find_spec_part(stats_spec, names[i])
- self.assertEqual(spec['item_name'], stats_xml[i].attrib['name'])
- self.assertEqual(spec['item_type'], stats_xml[i].attrib['type'])
- self.assertEqual(spec['item_description'], stats_xml[i].attrib['description'])
- self.assertEqual(spec['item_title'], stats_xml[i].attrib['title'])
- self.assertEqual(str(spec['item_optional']).lower(), stats_xml[i].attrib['optional'])
- default = spec['item_default']
- if type(default) is int:
- default = str(default)
- if type(default) is dict or type(default) is list:
- self.assertFalse('default' in stats_xml[i].attrib)
- else:
- self.assertEqual(default, stats_xml[i].attrib['default'])
- self.assertFalse('item_format' in spec)
- self.assertFalse('format' in stats_xml[i].attrib)
-
- @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
- def test_xsd_handler(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- xsd_string = self.stats_httpd.xsd_handler()
- stats_xsd = xml.etree.ElementTree.fromstring(xsd_string)
- ns = '{%s}' % XMLNS_XSD
- stats_xsd = stats_xsd[1].find('%scomplexType/%ssequence/%selement' % ((ns,)*3))
- self.assertEqual('item', stats_xsd.attrib['name'])
- stats_xsd = stats_xsd.find('%scomplexType' % ns)
- type_types = ('boolean', 'integer', 'real', 'string', 'map', \
- 'list', 'named_set', 'any')
- attribs = [('identifier', 'string', 'required'),
- ('value', 'string', 'optional'),
- ('owner', 'string', 'required'),
- ('uri', 'anyURI', 'required'),
- ('name', 'string', 'required'),
- ('type', type_types, 'required'),
- ('description', 'string', 'optional'),
- ('title', 'string', 'optional'),
- ('optional', 'boolean', 'optional'),
- ('default', 'string', 'optional'),
- ('format', 'string', 'optional')]
- for i in range(0, len(attribs)):
- self.assertEqual(attribs[i][0], stats_xsd[i].attrib['name'])
- if attribs[i][0] == 'type':
- stats_xsd_types = \
- stats_xsd[i].find('%ssimpleType/%srestriction' % \
- ((ns,)*2))
- for j in range(0, len(attribs[i][1])):
- self.assertEqual(attribs[i][1][j], \
- stats_xsd_types[j].attrib['value'])
- else:
- self.assertEqual(attribs[i][1], stats_xsd[i].attrib['type'])
- self.assertEqual(attribs[i][2], stats_xsd[i].attrib['use'])
-
- @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
- def test_xsl_handler(self):
- self.stats_httpd = MyStatsHttpd(get_availaddr())
- xsl_string = self.stats_httpd.xsl_handler()
- stats_xsl = xml.etree.ElementTree.fromstring(xsl_string)
- nst = '{%s}' % XMLNS_XSL
- nsx = '{%s}' % XMLNS_XHTML
- self.assertEqual("bind10:statistics", stats_xsl[2].attrib['match'])
- stats_xsl = stats_xsl[2].find('%stable' % nsx)
- self.assertEqual('item', stats_xsl[1].attrib['select'])
- stats_xsl = stats_xsl[1].find('%str' % nsx)
- self.assertEqual('@uri', stats_xsl[0].find(
- '%selement/%sattribute/%svalue-of' % ((nst,)*3)).attrib['select'])
- self.assertEqual('@identifier', stats_xsl[0].find(
- '%selement/%svalue-of' % ((nst,)*2)).attrib['select'])
- self.assertEqual('@value', stats_xsl[1].find('%sif' % nst).attrib['test'])
- self.assertEqual('@value', stats_xsl[1].find('%sif/%svalue-of' % ((nst,)*2)).attrib['select'])
- self.assertEqual('@description', stats_xsl[2].find('%sif' % nst).attrib['test'])
- self.assertEqual('@description', stats_xsl[2].find('%sif/%svalue-of' % ((nst,)*2)).attrib['select'])
-
- def test_for_without_B10_FROM_SOURCE(self):
- # just lets it go through the code without B10_FROM_SOURCE env
- # variable
- if "B10_FROM_SOURCE" in os.environ:
- tmppath = os.environ["B10_FROM_SOURCE"]
- os.environ.pop("B10_FROM_SOURCE")
- imp.reload(stats_httpd)
- os.environ["B10_FROM_SOURCE"] = tmppath
- imp.reload(stats_httpd)
-
-if __name__ == "__main__":
- isc.log.resetUnitTestRootLogger()
- unittest.main()
diff --git a/src/bin/stats/tests/b10-stats_test.py b/src/bin/stats/tests/b10-stats_test.py
deleted file mode 100644
index 540c707..0000000
--- a/src/bin/stats/tests/b10-stats_test.py
+++ /dev/null
@@ -1,1379 +0,0 @@
-# Copyright (C) 2010, 2011, 2012 Internet Systems Consortium.
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notice and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
-# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
-# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
-# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
-# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-"""
-In each of these tests we start several virtual components. They are
-not the real components, no external processes are started. They are
-just simple mock objects running each in its own thread and pretending
-to be bind10 modules. This helps testing the stats module in a close
-to real environment.
-"""
-
-import unittest
-import os
-import threading
-import io
-import time
-import imp
-import sys
-
-import stats
-import isc.log
-import isc.cc.session
-from test_utils import BaseModules, ThreadingServerManager, MyStats, \
- SimpleStats, SignalHandler, MyModuleCCSession, send_command
-from isc.testutils.ccsession_mock import MockModuleCCSession
-
-class TestUtilties(unittest.TestCase):
- items = [
- { 'item_name': 'test_int1', 'item_type': 'integer', 'item_default': 12345 },
- { 'item_name': 'test_real1', 'item_type': 'real', 'item_default': 12345.6789 },
- { 'item_name': 'test_bool1', 'item_type': 'boolean', 'item_default': True },
- { 'item_name': 'test_str1', 'item_type': 'string', 'item_default': 'ABCD' },
- { 'item_name': 'test_list1', 'item_type': 'list', 'item_default': [1,2,3],
- 'list_item_spec' : { 'item_name': 'number', 'item_type': 'integer' } },
- { 'item_name': 'test_map1', 'item_type': 'map', 'item_default': {'a':1,'b':2,'c':3},
- 'map_item_spec' : [ { 'item_name': 'a', 'item_type': 'integer'},
- { 'item_name': 'b', 'item_type': 'integer'},
- { 'item_name': 'c', 'item_type': 'integer'} ] },
- { 'item_name': 'test_int2', 'item_type': 'integer' },
- { 'item_name': 'test_real2', 'item_type': 'real' },
- { 'item_name': 'test_bool2', 'item_type': 'boolean' },
- { 'item_name': 'test_str2', 'item_type': 'string' },
- { 'item_name': 'test_list2', 'item_type': 'list',
- 'list_item_spec' : { 'item_name': 'number', 'item_type': 'integer' } },
- { 'item_name': 'test_map2', 'item_type': 'map',
- 'map_item_spec' : [ { 'item_name': 'A', 'item_type': 'integer'},
- { 'item_name': 'B', 'item_type': 'integer'},
- { 'item_name': 'C', 'item_type': 'integer'} ] },
- { 'item_name': 'test_none', 'item_type': 'none' },
- { 'item_name': 'test_list3', 'item_type': 'list', 'item_default': ["one","two","three"],
- 'list_item_spec' : { 'item_name': 'number', 'item_type': 'string' } },
- { 'item_name': 'test_map3', 'item_type': 'map', 'item_default': {'a':'one','b':'two','c':'three'},
- 'map_item_spec' : [ { 'item_name': 'a', 'item_type': 'string'},
- { 'item_name': 'b', 'item_type': 'string'},
- { 'item_name': 'c', 'item_type': 'string'} ] },
- {
- 'item_name': 'test_named_set',
- 'item_type': 'named_set',
- 'item_default': { },
- 'named_set_item_spec': {
- 'item_name': 'name',
- 'item_type': 'map',
- 'item_default': { },
- 'map_item_spec': [
- {
- 'item_name': 'number1',
- 'item_type': 'integer'
- },
- {
- 'item_name': 'number2',
- 'item_type': 'integer'
- }
- ]
- }
- }
- ]
-
- def setUp(self):
- self.const_timestamp = 1308730448.965706
- self.const_timetuple = (2011, 6, 22, 8, 14, 8, 2, 173, 0)
- self.const_datetime = '2011-06-22T08:14:08Z'
- stats.time = lambda : self.const_timestamp
- stats.gmtime = lambda : self.const_timetuple
-
- def test_get_spec_defaults(self):
- self.assertEqual(
- stats.get_spec_defaults(self.items), {
- 'test_int1' : 12345 ,
- 'test_real1' : 12345.6789 ,
- 'test_bool1' : True ,
- 'test_str1' : 'ABCD' ,
- 'test_list1' : [1,2,3] ,
- 'test_map1' : {'a':1,'b':2,'c':3},
- 'test_int2' : 0 ,
- 'test_real2' : 0.0,
- 'test_bool2' : False,
- 'test_str2' : "",
- 'test_list2' : [0],
- 'test_map2' : { 'A' : 0, 'B' : 0, 'C' : 0 },
- 'test_none' : None,
- 'test_list3' : [ "one", "two", "three" ],
- 'test_map3' : { 'a' : 'one', 'b' : 'two', 'c' : 'three' },
- 'test_named_set' : {} })
- self.assertEqual(stats.get_spec_defaults(None), {})
- self.assertRaises(KeyError, stats.get_spec_defaults, [{'item_name':'Foo'}])
-
- def test_get_timestamp(self):
- self.assertEqual(stats.get_timestamp(), self.const_timestamp)
-
- def test_get_datetime(self):
- self.assertEqual(stats.get_datetime(), self.const_datetime)
- self.assertNotEqual(stats.get_datetime(
- (2011, 6, 22, 8, 23, 40, 2, 173, 0)), self.const_datetime)
-
- def test__accum(self):
- self.assertEqual(stats._accum(None, None), None)
- self.assertEqual(stats._accum(None, "b"), "b")
- self.assertEqual(stats._accum("a", None), "a")
- self.assertEqual(stats._accum(1, 2), 3)
- self.assertEqual(stats._accum(0.5, 0.3), 0.8)
- self.assertEqual(stats._accum('aa','bb'), 'bb')
- self.assertEqual(stats._accum('1970-01-01T09:00:00Z','2012-08-09T09:33:31Z'),
- '2012-08-09T09:33:31Z')
- self.assertEqual(stats._accum(
- [1, 2, 3], [4, 5]), [5, 7, 3])
- self.assertEqual(stats._accum(
- [4, 5], [1, 2, 3]), [5, 7, 3])
- self.assertEqual(stats._accum(
- [1, 2, 3], [None, 5, 6]), [1, 7, 9])
- self.assertEqual(stats._accum(
- [None, 5, 6], [1, 2, 3]), [1, 7, 9])
- self.assertEqual(stats._accum(
- [1, 2, 3], [None, None, None, None]), [1,2,3,None])
- self.assertEqual(stats._accum(
- [[1,2],3],[[],5,6]), [[1,2],8,6])
- self.assertEqual(stats._accum(
- {'one': 1, 'two': 2, 'three': 3},
- {'one': 4, 'two': 5}),
- {'one': 5, 'two': 7, 'three': 3})
- self.assertEqual(stats._accum(
- {'one': 1, 'two': 2, 'three': 3},
- {'four': 4, 'five': 5}),
- {'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5})
- self.assertEqual(stats._accum(
- {'one': [1, 2], 'two': [3, None, 5], 'three': [None, 3, None]},
- {'one': [2], 'two': [4, 5], 'three': [None, None, None], 'four': 'FOUR'}),
- {'one':[3,2], 'two':[7,5,5], 'three':[None,3,None], 'four': 'FOUR'})
- self.assertEqual(stats._accum(
- [ {'one': 1, 'two': 2, 'three': 3}, {'four': 4, 'five': 5, 'six': 6} ],
- [ {}, {'four': 1, 'five': 2, 'six': 3} ]),
- [ {'one': 1, 'two': 2, 'three': 3}, {'four': 5, 'five': 7, 'six': 9} ])
-
- def test_merge_oldnre(self):
- self.assertEqual(stats.merge_oldnew(1, 2), 2)
- self.assertEqual(stats.merge_oldnew(0.5, 0.3), 0.3)
- self.assertEqual(stats.merge_oldnew('aa','bb'), 'bb')
- self.assertEqual(stats.merge_oldnew(
- [1, 2, 3], [4, 5]), [4, 5, 3])
- self.assertEqual(stats.merge_oldnew(
- [4, 5], [1, 2, 3]), [1, 2, 3])
- self.assertEqual(stats.merge_oldnew(
- [1, 2, 3], [None, 5, 6]), [None, 5, 6])
- self.assertEqual(stats.merge_oldnew(
- [None, 5, 6], [1, 2, 3]), [1, 2, 3])
- self.assertEqual(stats.merge_oldnew(
- [1, 2, 3], [None, None, None, None]), [None, None, None, None])
- self.assertEqual(stats.merge_oldnew(
- [[1,2],3],[[],5,6]), [[1,2],5,6])
- self.assertEqual(stats.merge_oldnew(
- {'one': 1, 'two': 2, 'three': 3},
- {'one': 4, 'two': 5}),
- {'one': 4, 'two': 5, 'three': 3})
- self.assertEqual(stats.merge_oldnew(
- {'one': 1, 'two': 2, 'three': 3},
- {'four': 4, 'five': 5}),
- {'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5})
- self.assertEqual(stats.merge_oldnew(
- {'one': [1, 2], 'two': [3, None, 5], 'three': [None, 3, None]},
- {'one': [2], 'two': [4, 5], 'three': [None, None, None], 'four': 'FOUR'}),
- {'one':[2,2], 'two':[4,5,5], 'three':[None,None,None], 'four': 'FOUR'})
- self.assertEqual(stats.merge_oldnew(
- [ {'one': 1, 'two': 2, 'three': 3}, {'four': 4, 'five': 5, 'six': 6} ],
- [ {}, {'four': 1, 'five': 2, 'six': 3} ]),
- [ {'one': 1, 'two': 2, 'three': 3}, {'four': 1, 'five': 2, 'six': 3} ])
-
-class TestCallback(unittest.TestCase):
- def setUp(self):
- self.dummy_func = lambda *x, **y : (x, y)
- self.dummy_args = (1,2,3)
- self.dummy_kwargs = {'a':1,'b':2,'c':3}
- self.cback1 = stats.Callback(
- command=self.dummy_func,
- args=self.dummy_args,
- kwargs=self.dummy_kwargs
- )
- self.cback2 = stats.Callback(
- args=self.dummy_args,
- kwargs=self.dummy_kwargs
- )
- self.cback3 = stats.Callback(
- command=self.dummy_func,
- kwargs=self.dummy_kwargs
- )
- self.cback4 = stats.Callback(
- command=self.dummy_func,
- args=self.dummy_args
- )
-
- def test_init(self):
- self.assertEqual((self.cback1.command, self.cback1.args, self.cback1.kwargs),
- (self.dummy_func, self.dummy_args, self.dummy_kwargs))
- self.assertEqual((self.cback2.command, self.cback2.args, self.cback2.kwargs),
- (None, self.dummy_args, self.dummy_kwargs))
- self.assertEqual((self.cback3.command, self.cback3.args, self.cback3.kwargs),
- (self.dummy_func, (), self.dummy_kwargs))
- self.assertEqual((self.cback4.command, self.cback4.args, self.cback4.kwargs),
- (self.dummy_func, self.dummy_args, {}))
-
- def test_call(self):
- self.assertEqual(self.cback1(), (self.dummy_args, self.dummy_kwargs))
- self.assertEqual(self.cback1(100, 200), ((100, 200), self.dummy_kwargs))
- self.assertEqual(self.cback1(a=100, b=200), (self.dummy_args, {'a':100, 'b':200}))
- self.assertEqual(self.cback2(), None)
- self.assertEqual(self.cback3(), ((), self.dummy_kwargs))
- self.assertEqual(self.cback3(100, 200), ((100, 200), self.dummy_kwargs))
- self.assertEqual(self.cback3(a=100, b=200), ((), {'a':100, 'b':200}))
- self.assertEqual(self.cback4(), (self.dummy_args, {}))
- self.assertEqual(self.cback4(100, 200), ((100, 200), {}))
- self.assertEqual(self.cback4(a=100, b=200), (self.dummy_args, {'a':100, 'b':200}))
-
-class TestStats(unittest.TestCase):
- def setUp(self):
- # set the signal handler for deadlock
- self.sig_handler = SignalHandler(self.fail)
- self.base = BaseModules()
- self.const_timestamp = 1308730448.965706
- self.const_datetime = '2011-06-22T08:14:08Z'
- self.const_default_datetime = '1970-01-01T00:00:00Z'
- # Record original module-defined functions in case we replace them
- self.__orig_timestamp = stats.get_timestamp
- self.__orig_get_datetime = stats.get_datetime
-
- def tearDown(self):
- self.base.shutdown()
- # reset the signal handler
- self.sig_handler.reset()
- # restore the stored original function in case we replaced them
- stats.get_timestamp = self.__orig_timestamp
- stats.get_datetime = self.__orig_get_datetime
-
- def test_init(self):
- self.stats = stats.Stats()
- self.assertEqual(self.stats.module_name, 'Stats')
- self.assertFalse(self.stats.running)
- self.assertTrue('command_show' in self.stats.callbacks)
- self.assertTrue('command_status' in self.stats.callbacks)
- self.assertTrue('command_shutdown' in self.stats.callbacks)
- self.assertTrue('command_show' in self.stats.callbacks)
- self.assertTrue('command_showschema' in self.stats.callbacks)
- self.assertEqual(self.stats.config['poll-interval'], 60)
-
- def test_init_undefcmd(self):
- spec_str = """\
-{
- "module_spec": {
- "module_name": "Stats",
- "module_description": "Stats daemon",
- "config_data": [],
- "commands": [
- {
- "command_name": "_undef_command_",
- "command_description": "a undefined command in stats",
- "command_args": []
- }
- ],
- "statistics": []
- }
-}
-"""
- orig_spec_location = stats.SPECFILE_LOCATION
- stats.SPECFILE_LOCATION = io.StringIO(spec_str)
- self.assertRaises(stats.StatsError, stats.Stats)
- stats.SPECFILE_LOCATION = orig_spec_location
-
- def __send_command(self, stats, command_name, params=None):
- '''Emulate a command arriving to stats by directly calling callback'''
- return isc.config.ccsession.parse_answer(
- stats.command_handler(command_name, params))
-
- def test_start(self):
- # Define a separate exception class so we can be sure that's actually
- # the one raised in __check_start() below
- class CheckException(Exception):
- pass
-
- def __check_start(tested_stats):
- self.assertTrue(tested_stats.running)
- raise CheckException # terminate the loop
-
- # start without err
- stats = SimpleStats()
- self.assertFalse(stats.running)
- stats._check_command = lambda: __check_start(stats)
- # We are going to confirm start() will set running to True, avoiding
- # to fall into a loop with the exception trick.
- self.assertRaises(CheckException, stats.start)
- self.assertEqual(self.__send_command(stats, "status"),
- (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
-
- def test_shutdown(self):
- def __check_shutdown(tested_stats):
- self.assertTrue(tested_stats.running)
- self.assertEqual(self.__send_command(tested_stats, "shutdown"),
- (0, None))
- self.assertFalse(tested_stats.running)
- # override get_interval() so it won't go poll statistics
- tested_stats.get_interval = lambda : 0
-
- stats = SimpleStats()
- stats._check_command = lambda: __check_shutdown(stats)
- stats.start()
- self.assertTrue(stats.mccs.stopped)
-
- def test_handlers(self):
- """Test command_handler"""
-
- __stats = SimpleStats()
-
- # 'show' command. We're going to check the expected methods are
- # called in the expected order, and check the resulting response.
- # Details of each method are tested separately.
- call_log = []
- def __steal_method(fn_name, *arg):
- call_log.append((fn_name, arg))
- if fn_name == 'update_stat':
- return False # "no error"
- if fn_name == 'showschema':
- return isc.config.create_answer(0, 'no error')
-
- # Fake some methods and attributes for inspection
- __stats.do_polling = lambda: __steal_method('polling')
- __stats.update_statistics_data = \
- lambda x, y, z: __steal_method('update_stat', x, y, z)
- __stats.update_modules = lambda: __steal_method('update_module')
- __stats.mccs.lname = 'test lname'
- __stats.statistics_data = {'Init': {'boot_time': self.const_datetime}}
-
- # skip initial polling
- stats.get_timestamp = lambda: 0
- __stats._lasttime_poll = 0
-
- stats.get_datetime = lambda: 42 # make the result predictable
-
- # now send the command
- self.assertEqual(
- self.__send_command(
- __stats, 'show',
- params={ 'owner' : 'Init', 'name' : 'boot_time' }),
- (0, {'Init': {'boot_time': self.const_datetime}}))
- # Check if expected methods are called
- self.assertEqual([('update_stat',
- ('Stats', 'test lname',
- {'timestamp': 0,
- 'report_time': 42})),
- ('update_module', ())], call_log)
-
- # Then update faked timestamp so the initial polling will happen, and
- # confirm that.
- call_log = []
- stats.get_timestamp = lambda: 10
- self.assertEqual(
- self.__send_command(
- __stats, 'show',
- params={ 'owner' : 'Init', 'name' : 'boot_time' }),
- (0, {'Init': {'boot_time': self.const_datetime}}))
- self.assertEqual([('polling', ()),
- ('update_stat',
- ('Stats', 'test lname',
- {'timestamp': 10,
- 'report_time': 42})),
- ('update_module', ())], call_log)
-
- # 'status' command. We can confirm the behavior without any fake
- self.assertEqual(
- self.__send_command(__stats, 'status'),
- (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
-
- # 'showschema' command. update_modules() will be called, which
- # (implicitly) confirms the correct method is called; further details
- # are tested separately.
- call_log = []
- (rcode, value) = self.__send_command(__stats, 'showschema')
- self.assertEqual([('update_module', ())], call_log)
-
- # Unknown command. Error should be returned
- self.assertEqual(
- self.__send_command(__stats, '__UNKNOWN__'),
- (1, "Unknown command: '__UNKNOWN__'"))
-
- def test_update_modules(self):
- """Confirm the behavior of Stats.update_modules().
-
- It checks whether the expected command is sent to ConfigManager,
- and whether the answer from ConfigManager is handled as expected.
-
- """
-
- def __check_rpc_call(command, group):
- self.assertEqual('ConfigManager', group)
- self.assertEqual(command,
- isc.config.ccsession.COMMAND_GET_STATISTICS_SPEC)
- answer_value = {'Init': [{
- "item_name": "boot_time",
- "item_type": "string",
- "item_optional": False,
- # Use a different default so we can check it below
- "item_default": "2013-01-01T00:00:01Z",
- "item_title": "Boot time",
- "item_description": "dummy desc",
- "item_format": "date-time"
- }]}
- return answer_value
-
- self.stats = SimpleStats()
- self.stats.cc_session.rpc_call = __check_rpc_call
-
- self.stats.update_modules()
-
- # Stats is always incorporated. For others, only the ones returned
- # by group_recvmsg() above is available.
- self.assertTrue('Stats' in self.stats.modules)
- self.assertTrue('Init' in self.stats.modules)
- self.assertFalse('Dummy' in self.stats.modules)
-
- my_statistics_data = stats.get_spec_defaults(
- self.stats.modules['Stats'].get_statistics_spec())
- self.assertTrue('report_time' in my_statistics_data)
- self.assertTrue('boot_time' in my_statistics_data)
- self.assertTrue('last_update_time' in my_statistics_data)
- self.assertTrue('timestamp' in my_statistics_data)
- self.assertTrue('lname' in my_statistics_data)
- self.assertEqual(my_statistics_data['report_time'],
- self.const_default_datetime)
- self.assertEqual(my_statistics_data['boot_time'],
- self.const_default_datetime)
- self.assertEqual(my_statistics_data['last_update_time'],
- self.const_default_datetime)
- self.assertEqual(my_statistics_data['timestamp'], 0.0)
- self.assertEqual(my_statistics_data['lname'], "")
- my_statistics_data = stats.get_spec_defaults(
- self.stats.modules['Init'].get_statistics_spec())
- self.assertTrue('boot_time' in my_statistics_data)
- self.assertEqual(my_statistics_data['boot_time'],
- "2013-01-01T00:00:01Z")
-
- # Error case
- def __raise_on_rpc_call(x, y):
- raise isc.config.RPCError(99, 'error')
- orig_parse_answer = stats.isc.config.ccsession.parse_answer
- self.stats.cc_session.rpc_call = __raise_on_rpc_call
- self.assertRaises(stats.StatsError, self.stats.update_modules)
-
- def test_get_statistics_data(self):
- """Confirm the behavior of Stats.get_statistics_data().
-
- It should first call update_modules(), and then retrieve the requested
- data from statistics_data. We confirm this by fake update_modules()
- where we set the expected data in statistics_data.
-
- """
- self.stats = SimpleStats()
- def __faked_update_modules():
- self.stats.statistics_data = { \
- 'Stats': {
- 'report_time': self.const_default_datetime,
- 'boot_time': None,
- 'last_update_time': None,
- 'timestamp': 0.0,
- 'lname': 'dummy name'
- },
- 'Init': { 'boot_time': None }
- }
-
- self.stats.update_modules = __faked_update_modules
-
- my_statistics_data = self.stats.get_statistics_data()
- self.assertTrue('Stats' in my_statistics_data)
- self.assertTrue('Init' in my_statistics_data)
- self.assertTrue('boot_time' in my_statistics_data['Init'])
-
- my_statistics_data = self.stats.get_statistics_data(owner='Stats')
- self.assertTrue('Stats' in my_statistics_data)
- self.assertTrue('report_time' in my_statistics_data['Stats'])
- self.assertTrue('boot_time' in my_statistics_data['Stats'])
- self.assertTrue('last_update_time' in my_statistics_data['Stats'])
- self.assertTrue('timestamp' in my_statistics_data['Stats'])
- self.assertTrue('lname' in my_statistics_data['Stats'])
- self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
- owner='Foo')
-
- my_statistics_data = self.stats.get_statistics_data(
- owner='Stats', name='report_time')
- self.assertEqual(my_statistics_data['Stats']['report_time'],
- self.const_default_datetime)
-
- my_statistics_data = self.stats.get_statistics_data(
- owner='Stats', name='boot_time')
- self.assertTrue('boot_time' in my_statistics_data['Stats'])
-
- my_statistics_data = self.stats.get_statistics_data(
- owner='Stats', name='last_update_time')
- self.assertTrue('last_update_time' in my_statistics_data['Stats'])
-
- my_statistics_data = self.stats.get_statistics_data(
- owner='Stats', name='timestamp')
- self.assertEqual(my_statistics_data['Stats']['timestamp'], 0.0)
-
- my_statistics_data = self.stats.get_statistics_data(
- owner='Stats', name='lname')
- self.assertTrue(len(my_statistics_data['Stats']['lname']) >0)
- self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
- owner='Stats', name='Bar')
- self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
- owner='Foo', name='Bar')
- self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
- name='Bar')
-
- def test_update_statistics_data(self):
- """test for list-type statistics"""
- self.stats = SimpleStats()
- _test_exp1 = {
- 'zonename': 'test1.example',
- 'queries.tcp': 5,
- 'queries.udp': 4
- }
- _test_exp2 = {
- 'zonename': 'test2.example',
- 'queries.tcp': 3,
- 'queries.udp': 2
- }
- _test_exp3 = {}
- _test_exp4 = {
- 'queries.udp': 4
- }
- _test_exp5_1 = {
- 'queries.perzone': [
- { },
- {
- 'queries.udp': 9876
- }
- ]
- }
- _test_exp5_2 = {
- 'queries.perzone[1]/queries.udp':
- isc.cc.data.find(_test_exp5_1,
- 'queries.perzone[1]/queries.udp')
- }
- # Success cases
- self.assertEqual(self.stats.statistics_data['Stats']['lname'],
- self.stats.cc_session.lname)
- self.stats.update_statistics_data(
- 'Stats', self.stats.cc_session.lname,
- {'lname': 'foo at bar'})
- self.assertEqual(self.stats.statistics_data['Stats']['lname'],
- 'foo at bar')
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'queries.perzone': [_test_exp1]}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['queries.perzone'],\
- [_test_exp1])
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'queries.perzone': [_test_exp2]}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['queries.perzone'],\
- [_test_exp2])
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'queries.perzone': [_test_exp1,_test_exp2]}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['queries.perzone'],
- [_test_exp1,_test_exp2])
- # differential update
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'queries.perzone': [_test_exp3,_test_exp4]}))
- _new_data = stats.merge_oldnew(_test_exp2,_test_exp4)
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['queries.perzone'], \
- [_test_exp1,_new_data])
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', _test_exp5_2))
- _new_data = stats.merge_oldnew(_new_data,
- _test_exp5_1['queries.perzone'][1])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['queries.perzone'], \
- [_test_exp1,_new_data])
- # Error cases
- self.assertEqual(self.stats.update_statistics_data('Stats', None,
- {'lname': 0.0}),
- ['0.0 should be a string'])
- self.assertEqual(self.stats.update_statistics_data('Dummy', None,
- {'foo': 'bar'}),
- ['unknown module name: Dummy'])
- self.assertEqual(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'queries.perzone': [None]}), ['None should be a map'])
-
- def test_update_statistics_data_pt2(self):
- """test for named_set-type statistics"""
- self.stats = SimpleStats()
- _test_exp1 = \
- { 'test10.example': { 'queries.tcp': 5, 'queries.udp': 4 } }
- _test_exp2 = \
- { 'test20.example': { 'queries.tcp': 3, 'queries.udp': 2 } }
- _test_exp3 = {}
- _test_exp4 = { 'test20.example': { 'queries.udp': 4 } }
- _test_exp5_1 = { 'test10.example': { 'queries.udp': 5432 } }
- _test_exp5_2 ={
- 'nds_queries.perzone/test10.example/queries.udp':
- isc.cc.data.find(_test_exp5_1, 'test10.example/queries.udp')
- }
- _test_exp6 = { 'foo/bar': 'brabra' }
- _test_exp7 = { 'foo[100]': 'bar' }
- # Success cases
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'nds_queries.perzone': _test_exp1}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- _test_exp1)
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'nds_queries.perzone': _test_exp2}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- dict(_test_exp1,**_test_exp2))
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'nds_queries.perzone':
- dict(_test_exp1, **_test_exp2)}))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],
- dict(_test_exp1, **_test_exp2))
- # differential update
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'nds_queries.perzone':
- dict(_test_exp3, **_test_exp4)}))
- _new_val = dict(_test_exp1,
- **stats.merge_oldnew(_test_exp2,_test_exp4))
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- _new_val)
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo1', _test_exp5_2))
- _new_val = stats.merge_oldnew(_new_val, _test_exp5_1)
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- _new_val)
- self.assertIsNone(self.stats.update_statistics_data(
- 'Auth', 'foo2', _test_exp5_2))
- _new_val = stats.merge_oldnew(_new_val, _test_exp5_1)
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo2']['nds_queries.perzone'],\
- _test_exp5_1)
- # Error cases
- self.assertEqual(self.stats.update_statistics_data(
- 'Auth', 'foo1', {'nds_queries.perzone': None}),
- ['None should be a map'])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- _new_val)
- self.assertEqual(self.stats.update_statistics_data(
- 'Auth', 'foo1', _test_exp6), ['unknown item foo'])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']\
- ['foo1']['nds_queries.perzone'],\
- _new_val)
- self.assertEqual(self.stats.update_statistics_data(
- 'Init', 'bar1', _test_exp7), ["KeyError: 'foo'"])
- self.assertEqual(self.stats.update_statistics_data(
- 'Foo', 'foo1', _test_exp6), ['unknown module name: Foo'])
-
- def test_update_statistics_data_withmid(self):
- self.stats = SimpleStats()
-
- # This test relies on existing statistics data at the Stats object.
- # This version of test prepares the data using the do_polling() method;
- # that's a bad practice because a unittest for a method
- # (update_statistics_data) would heavily depend on details of another
- # method (do_polling). However, there's currently no direct test
- # for do_polling (which is also bad), so we still keep that approach,
- # partly for testing do_polling indirectly. #2781 should provide
- # direct test for do_polling, with which this test scenario should
- # also be changed to be more stand-alone.
-
- # We use the knowledge of what kind of messages are sent via
- # do_polling, and return the following faked answer directly.
- create_answer = isc.config.ccsession.create_answer # shortcut
- self.stats._answers = [\
- # Answer for "show_processes"
- (create_answer(0, [[1034, 'b10-auth-1', 'Auth'],
- [1035, 'b10-auth-2', 'Auth']]), None),
- # Answers for "getstats". 2 for Auth instances and 1 for Init.
- # we return some bogus values for Init, but the rest of the test
- # doesn't need it, so it's okay.
- (create_answer(0, self.stats._auth_sdata), {'from': 'auth1'}),
- (create_answer(0, self.stats._auth_sdata), {'from': 'auth2'}),
- (create_answer(0, self.stats._auth_sdata), {'from': 'auth3'})
- ]
- # do_polling calls update_modules internally; in our scenario there's
- # no change in modules, so we make it no-op.
- self.stats.update_modules = lambda: None
- # Now call do_polling.
- self.stats.do_polling()
-
- # samples of query number
- bar1_tcp = 1001
- bar2_tcp = 2001
- bar3_tcp = 1002
- bar3_udp = 1003
- # two auth instances invoked, so we double the pre-set stat values
- sum_qtcp = self.stats._queries_tcp * 2
- sum_qudp = self.stats._queries_udp * 2
- self.stats.update_statistics_data('Auth', "bar1 at foo",
- {'queries.tcp': bar1_tcp})
- self.assertTrue('Auth' in self.stats.statistics_data)
- self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
- self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
- bar1_tcp + sum_qtcp)
- self.assertTrue('Auth' in self.stats.statistics_data_bymid)
- self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
- self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid
- ['Auth']['bar1 at foo'])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo'],
- {'queries.tcp': bar1_tcp})
- # check consolidation of statistics data even if there is
- # non-existent mid of Auth
- self.stats.update_statistics_data('Auth', "bar2 at foo",
- {'queries.tcp': bar2_tcp})
- self.assertTrue('Auth' in self.stats.statistics_data)
- self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
- self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
- bar1_tcp + bar2_tcp + sum_qtcp)
- self.assertTrue('Auth' in self.stats.statistics_data_bymid)
- self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
- self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid['Auth']['bar1 at foo'])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo'],
- {'queries.tcp': bar1_tcp})
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar2 at foo'],
- {'queries.tcp': bar2_tcp})
- # kill running Auth but the statistics data doesn't change
- self.base.auth2.server.shutdown()
- self.stats.update_statistics_data()
- self.assertTrue('Auth' in self.stats.statistics_data)
- self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
- self.assertTrue('queries.udp' in self.stats.statistics_data['Auth'])
- self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
- bar1_tcp + bar2_tcp + sum_qtcp)
- self.assertEqual(self.stats.statistics_data['Auth']['queries.udp'],
- sum_qudp)
- self.assertTrue('Auth' in self.stats.statistics_data_bymid)
- # restore statistics data of killed auth
- # self.base.b10_init.server.pid_list = [ killed ] + self.base.b10_init.server.pid_list[:]
- self.stats.update_statistics_data('Auth',
- "bar1 at foo",
- {'queries.tcp': bar1_tcp})
- # set another mid of Auth
- self.stats.update_statistics_data('Auth',
- "bar3 at foo",
- {'queries.tcp':bar3_tcp,
- 'queries.udp':bar3_udp})
- self.assertTrue('Auth' in self.stats.statistics_data)
- self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
- self.assertTrue('queries.udp' in self.stats.statistics_data['Auth'])
- self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
- bar1_tcp + bar2_tcp + bar3_tcp + sum_qtcp)
- self.assertEqual(self.stats.statistics_data['Auth']['queries.udp'],
- bar3_udp + sum_qudp)
- self.assertTrue('Auth' in self.stats.statistics_data_bymid)
- self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
- self.assertTrue('bar3 at foo' in self.stats.statistics_data_bymid['Auth'])
- self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid['Auth']['bar1 at foo'])
- self.assertTrue('queries.udp' in self.stats.statistics_data_bymid['Auth']['bar3 at foo'])
- self.assertTrue('queries.udp' in self.stats.statistics_data_bymid['Auth']['bar3 at foo'])
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo']['queries.tcp'], bar1_tcp)
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar3 at foo']['queries.tcp'], bar3_tcp)
- self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar3 at foo']['queries.udp'], bar3_udp)
-
- def test_config(self):
- orig_get_timestamp = stats.get_timestamp
- stats.get_timestamp = lambda : self.const_timestamp
- stat = SimpleStats()
-
- # test updating poll-interval
- self.assertEqual(stat.config['poll-interval'], 60)
- self.assertEqual(stat.get_interval(), 60)
- self.assertEqual(stat.next_polltime, self.const_timestamp + 60)
- self.assertEqual(stat.config_handler({'poll-interval': 120}),
- isc.config.create_answer(0))
- self.assertEqual(stat.config['poll-interval'], 120)
- self.assertEqual(stat.get_interval(), 120)
- self.assertEqual(stat.next_polltime, self.const_timestamp + 120)
- stats.get_timestamp = orig_get_timestamp
- self.assertEqual(stat.config_handler({'poll-interval': "foo"}),
- isc.config.create_answer(1, 'foo should be an integer'))
- self.assertEqual(stat.config_handler({'poll-interval': -1}),
- isc.config.create_answer(1, 'Negative integer ignored'))
- # unknown item
- self.assertEqual(
- stat.config_handler({'_UNKNOWN_KEY_': None}),
- isc.config.ccsession.create_answer(
- 1, "unknown item _UNKNOWN_KEY_"))
- # test no change if zero interval time
- self.assertEqual(stat.config_handler({'poll-interval': 0}),
- isc.config.create_answer(0))
- self.assertEqual(stat.config['poll-interval'], 0)
-
- # see the comment for test_update_statistics_data_withmid. We abuse
- # do_polling here, too. With #2781 we should make it more direct.
- create_answer = isc.config.ccsession.create_answer # shortcut
- stat._answers = [\
- # Answer for "show_processes"
- (create_answer(0, []), None),
- # Answers for "getstats" for Init (the other one for Auth, but
- # that doesn't matter for this test)
- (create_answer(0, stat._init_sdata), {'from': 'init'}),
- (create_answer(0, stat._init_sdata), {'from': 'init'})
- ]
- stat.update_modules = lambda: None
-
- self.assertEqual(
- self.__send_command(
- stat, 'show',
- params={ 'owner' : 'Init', 'name' : 'boot_time' }),
- (0, {'Init': {'boot_time': self.const_datetime}}))
-
- def test_commands(self):
- self.stats = stats.Stats()
-
- # status
- self.assertEqual(self.stats.command_status(),
- isc.config.create_answer(
- 0, "Stats is up. (PID " + str(os.getpid()) + ")"))
-
- # shutdown
- self.stats.running = True
- self.assertEqual(self.stats.command_shutdown(),
- isc.config.create_answer(0))
- self.assertFalse(self.stats.running)
-
- @unittest.skipIf(sys.version_info >= (3, 3), "Unsupported in Python 3.3 or higher")
- def test_command_show(self):
- # two auth instances invoked
- list_auth = [ self.base.auth.server,
- self.base.auth2.server ]
- sum_qtcp = 0
- sum_qudp = 0
- sum_qtcp_perzone1 = 0
- sum_qudp_perzone1 = 0
- sum_qtcp_perzone2 = 4 * len(list_auth)
- sum_qudp_perzone2 = 3 * len(list_auth)
- sum_qtcp_nds_perzone10 = 0
- sum_qudp_nds_perzone10 = 0
- sum_qtcp_nds_perzone20 = 4 * len(list_auth)
- sum_qudp_nds_perzone20 = 3 * len(list_auth)
- self.stats = stats.Stats()
- self.assertEqual(self.stats.command_show(owner='Foo', name=None),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Foo, name: None"))
- self.assertEqual(self.stats.command_show(owner='Foo', name='_bar_'),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Foo, name: _bar_"))
- self.assertEqual(self.stats.command_show(owner='Foo', name='bar'),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Foo, name: bar"))
-
- for a in list_auth:
- sum_qtcp += a.queries_tcp
- sum_qudp += a.queries_udp
- sum_qtcp_perzone1 += a.queries_per_zone[0]['queries.tcp']
- sum_qudp_perzone1 += a.queries_per_zone[0]['queries.udp']
- sum_qtcp_nds_perzone10 += a.nds_queries_per_zone['test10.example']['queries.tcp']
- sum_qudp_nds_perzone10 += a.nds_queries_per_zone['test10.example']['queries.udp']
-
- self.assertEqual(self.stats.command_show(owner='Auth'),
- isc.config.create_answer(
- 0, {'Auth':{ 'queries.udp': sum_qudp,
- 'queries.tcp': sum_qtcp,
- 'queries.perzone': [{ 'zonename': 'test1.example',
- 'queries.udp': sum_qudp_perzone1,
- 'queries.tcp': sum_qtcp_perzone1 },
- { 'zonename': 'test2.example',
- 'queries.udp': sum_qudp_perzone2,
- 'queries.tcp': sum_qtcp_perzone2 }
- ],
- 'nds_queries.perzone': { 'test10.example' : {
- 'queries.udp': sum_qudp_nds_perzone10,
- 'queries.tcp': sum_qtcp_nds_perzone10 },
- 'test20.example' : {
- 'queries.udp': sum_qudp_nds_perzone20,
- 'queries.tcp': sum_qtcp_nds_perzone20 }
- }}}))
- self.assertEqual(self.stats.command_show(owner='Auth', name='queries.udp'),
- isc.config.create_answer(
- 0, {'Auth': {'queries.udp': sum_qudp}}))
- self.assertEqual(self.stats.command_show(owner='Auth', name='queries.perzone'),
- isc.config.create_answer(
- 0, {'Auth': {'queries.perzone': [
- { 'zonename': 'test1.example',
- 'queries.udp': sum_qudp_perzone1,
- 'queries.tcp': sum_qtcp_perzone1 },
- { 'zonename': 'test2.example',
- 'queries.udp': sum_qudp_perzone2,
- 'queries.tcp': sum_qtcp_perzone2 }]}}))
- self.assertEqual(self.stats.command_show(owner='Auth', name='nds_queries.perzone'),
- isc.config.create_answer(
- 0, {'Auth': {'nds_queries.perzone': {
- 'test10.example': {
- 'queries.udp': sum_qudp_nds_perzone10,
- 'queries.tcp': sum_qtcp_nds_perzone10 },
- 'test20.example': {
- 'queries.udp': sum_qudp_nds_perzone20,
- 'queries.tcp': sum_qtcp_nds_perzone20 }}}}))
- orig_get_datetime = stats.get_datetime
- orig_get_timestamp = stats.get_timestamp
- stats.get_datetime = lambda x=None: self.const_datetime
- stats.get_timestamp = lambda : self.const_timestamp
- self.assertEqual(self.stats.command_show(owner='Stats', name='report_time'),
- isc.config.create_answer(
- 0, {'Stats': {'report_time':self.const_datetime}}))
- self.assertEqual(self.stats.command_show(owner='Stats', name='timestamp'),
- isc.config.create_answer(
- 0, {'Stats': {'timestamp':self.const_timestamp}}))
- stats.get_datetime = orig_get_datetime
- stats.get_timestamp = orig_get_timestamp
- self.stats.modules[self.stats.module_name] = isc.config.module_spec.ModuleSpec(
- { "module_name": self.stats.module_name,
- "statistics": [] } )
- self.assertRaises(
- stats.StatsError, self.stats.command_show, owner=self.stats.module_name, name='bar')
-
- def test_command_showchema(self):
- self.stats = stats.Stats()
- (rcode, value) = isc.config.ccsession.parse_answer(
- self.stats.command_showschema())
- self.assertEqual(rcode, 0)
- self.assertEqual(len(value), 3)
- self.assertTrue('Stats' in value)
- self.assertTrue('Init' in value)
- self.assertTrue('Auth' in value)
- self.assertFalse('__Dummy__' in value)
- schema = value['Stats']
- self.assertEqual(len(schema), 5)
- for item in schema:
- self.assertTrue(len(item) == 6 or len(item) == 7)
- self.assertTrue('item_name' in item)
- self.assertTrue('item_type' in item)
- self.assertTrue('item_optional' in item)
- self.assertTrue('item_default' in item)
- self.assertTrue('item_title' in item)
- self.assertTrue('item_description' in item)
- if len(item) == 7:
- self.assertTrue('item_format' in item)
-
- schema = value['Init']
- self.assertEqual(len(schema), 1)
- for item in schema:
- self.assertTrue(len(item) == 7)
- self.assertTrue('item_name' in item)
- self.assertTrue('item_type' in item)
- self.assertTrue('item_optional' in item)
- self.assertTrue('item_default' in item)
- self.assertTrue('item_title' in item)
- self.assertTrue('item_description' in item)
- self.assertTrue('item_format' in item)
-
- schema = value['Auth']
- self.assertEqual(len(schema), 4)
- for item in schema:
- if item['item_type'] == 'list' or item['item_type'] == 'named_set':
- self.assertEqual(len(item), 7)
- else:
- self.assertEqual(len(item), 6)
- self.assertTrue('item_name' in item)
- self.assertTrue('item_type' in item)
- self.assertTrue('item_optional' in item)
- self.assertTrue('item_default' in item)
- self.assertTrue('item_title' in item)
- self.assertTrue('item_description' in item)
-
- (rcode, value) = isc.config.ccsession.parse_answer(
- self.stats.command_showschema(owner='Stats'))
- self.assertEqual(rcode, 0)
- self.assertTrue('Stats' in value)
- self.assertFalse('Init' in value)
- self.assertFalse('Auth' in value)
- for item in value['Stats']:
- self.assertTrue(len(item) == 6 or len(item) == 7)
- self.assertTrue('item_name' in item)
- self.assertTrue('item_type' in item)
- self.assertTrue('item_optional' in item)
- self.assertTrue('item_default' in item)
- self.assertTrue('item_title' in item)
- self.assertTrue('item_description' in item)
- if len(item) == 7:
- self.assertTrue('item_format' in item)
-
- (rcode, value) = isc.config.ccsession.parse_answer(
- self.stats.command_showschema(owner='Stats', name='report_time'))
- self.assertEqual(rcode, 0)
- self.assertTrue('Stats' in value)
- self.assertFalse('Init' in value)
- self.assertFalse('Auth' in value)
- self.assertEqual(len(value['Stats'][0]), 7)
- self.assertTrue('item_name' in value['Stats'][0])
- self.assertTrue('item_type' in value['Stats'][0])
- self.assertTrue('item_optional' in value['Stats'][0])
- self.assertTrue('item_default' in value['Stats'][0])
- self.assertTrue('item_title' in value['Stats'][0])
- self.assertTrue('item_description' in value['Stats'][0])
- self.assertTrue('item_format' in value['Stats'][0])
- self.assertEqual(value['Stats'][0]['item_name'], 'report_time')
- self.assertEqual(value['Stats'][0]['item_format'], 'date-time')
-
- self.assertEqual(self.stats.command_showschema(owner='Foo'),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Foo, name: None"))
- self.assertEqual(self.stats.command_showschema(owner='Foo', name='bar'),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Foo, name: bar"))
- self.assertEqual(self.stats.command_showschema(owner='Auth'),
- isc.config.create_answer(
- 0, {'Auth': [{
- "item_default": 0,
- "item_description": "A number of total query counts which all auth servers receive over TCP since they started initially",
- "item_name": "queries.tcp",
- "item_optional": False,
- "item_title": "Queries TCP",
- "item_type": "integer"
- },
- {
- "item_default": 0,
- "item_description": "A number of total query counts which all auth servers receive over UDP since they started initially",
- "item_name": "queries.udp",
- "item_optional": False,
- "item_title": "Queries UDP",
- "item_type": "integer"
- },
- {
- "item_name": "queries.perzone",
- "item_type": "list",
- "item_optional": False,
- "item_default": [
- {
- "zonename" : "test1.example",
- "queries.udp" : 1,
- "queries.tcp" : 2
- },
- {
- "zonename" : "test2.example",
- "queries.udp" : 3,
- "queries.tcp" : 4
- }
- ],
- "item_title": "Queries per zone",
- "item_description": "Queries per zone",
- "list_item_spec": {
- "item_name": "zones",
- "item_type": "map",
- "item_optional": False,
- "item_default": {},
- "map_item_spec": [
- {
- "item_name": "zonename",
- "item_type": "string",
- "item_optional": False,
- "item_default": "",
- "item_title": "Zonename",
- "item_description": "Zonename"
- },
- {
- "item_name": "queries.udp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries UDP per zone",
- "item_description": "A number of UDP query counts per zone"
- },
- {
- "item_name": "queries.tcp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries TCP per zone",
- "item_description": "A number of TCP query counts per zone"
- }
- ]
- }
- },
- {
- "item_name": "nds_queries.perzone",
- "item_type": "named_set",
- "item_optional": False,
- "item_default": {
- "test10.example" : {
- "queries.udp" : 1,
- "queries.tcp" : 2
- },
- "test20.example" : {
- "queries.udp" : 3,
- "queries.tcp" : 4
- }
- },
- "item_title": "Queries per zone",
- "item_description": "Queries per zone",
- "named_set_item_spec": {
- "item_name": "zonename",
- "item_type": "map",
- "item_optional": False,
- "item_default": {},
- "item_title": "Zonename",
- "item_description": "Zonename",
- "map_item_spec": [
- {
- "item_name": "queries.udp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries UDP per zone",
- "item_description": "A number of UDP query counts per zone"
- },
- {
- "item_name": "queries.tcp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries TCP per zone",
- "item_description": "A number of TCP query counts per zone"
- }
- ]
- }
- }]}))
- self.assertEqual(self.stats.command_showschema(owner='Auth', name='queries.tcp'),
- isc.config.create_answer(
- 0, {'Auth': [{
- "item_default": 0,
- "item_description": "A number of total query counts which all auth servers receive over TCP since they started initially",
- "item_name": "queries.tcp",
- "item_optional": False,
- "item_title": "Queries TCP",
- "item_type": "integer"
- }]}))
- self.assertEqual(self.stats.command_showschema(owner='Auth', name='queries.perzone'),
- isc.config.create_answer(
- 0, {'Auth':[{
- "item_name": "queries.perzone",
- "item_type": "list",
- "item_optional": False,
- "item_default": [
- {
- "zonename" : "test1.example",
- "queries.udp" : 1,
- "queries.tcp" : 2
- },
- {
- "zonename" : "test2.example",
- "queries.udp" : 3,
- "queries.tcp" : 4
- }
- ],
- "item_title": "Queries per zone",
- "item_description": "Queries per zone",
- "list_item_spec": {
- "item_name": "zones",
- "item_type": "map",
- "item_optional": False,
- "item_default": {},
- "map_item_spec": [
- {
- "item_name": "zonename",
- "item_type": "string",
- "item_optional": False,
- "item_default": "",
- "item_title": "Zonename",
- "item_description": "Zonename"
- },
- {
- "item_name": "queries.udp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries UDP per zone",
- "item_description": "A number of UDP query counts per zone"
- },
- {
- "item_name": "queries.tcp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries TCP per zone",
- "item_description": "A number of TCP query counts per zone"
- }
- ]
- }
- }]}))
- self.assertEqual(self.stats.command_showschema(owner='Auth', name='nds_queries.perzone'),
- isc.config.create_answer(
- 0, {'Auth':[{
- "item_name": "nds_queries.perzone",
- "item_type": "named_set",
- "item_optional": False,
- "item_default": {
- "test10.example" : {
- "queries.udp" : 1,
- "queries.tcp" : 2
- },
- "test20.example" : {
- "queries.udp" : 3,
- "queries.tcp" : 4
- }
- },
- "item_title": "Queries per zone",
- "item_description": "Queries per zone",
- "named_set_item_spec": {
- "item_name": "zonename",
- "item_type": "map",
- "item_optional": False,
- "item_default": {},
- "item_title": "Zonename",
- "item_description": "Zonename",
- "map_item_spec": [
- {
- "item_name": "queries.udp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries UDP per zone",
- "item_description": "A number of UDP query counts per zone"
- },
- {
- "item_name": "queries.tcp",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 0,
- "item_title": "Queries TCP per zone",
- "item_description": "A number of TCP query counts per zone"
- }
- ]
- }
- }]}))
-
- self.assertEqual(self.stats.command_showschema(owner='Stats', name='bar'),
- isc.config.create_answer(
- 1, "specified arguments are incorrect: owner: Stats, name: bar"))
- self.assertEqual(self.stats.command_showschema(name='bar'),
- isc.config.create_answer(
- 1, "module name is not specified"))
-
- @unittest.skipIf(sys.version_info >= (3, 3), "Unsupported in Python 3.3 or higher")
- def test_polling(self):
- stats_server = ThreadingServerManager(MyStats)
- stat = stats_server.server
- stats_server.run()
- self.assertEqual(
- send_command('show', 'Stats'),
- (0, stat.statistics_data))
- # check statistics data of 'Init'
- b10_init = self.base.b10_init.server
- self.assertEqual(
- stat.statistics_data_bymid['Init'][b10_init.cc_session.lname],
- {'boot_time': self.const_datetime})
- self.assertEqual(
- len(stat.statistics_data_bymid['Init']), 1)
- self.assertEqual(
- stat.statistics_data['Init'],
- {'boot_time': self.const_datetime})
- # check statistics data of each 'Auth' instances
- list_auth = ['', '2']
- for i in list_auth:
- auth = getattr(self.base,"auth"+i).server
- for s in stat.statistics_data_bymid['Auth'].values():
- self.assertEqual(
- s, {'queries.perzone': auth.queries_per_zone,
- 'nds_queries.perzone': auth.nds_queries_per_zone,
- 'queries.tcp': auth.queries_tcp,
- 'queries.udp': auth.queries_udp})
- n = len(stat.statistics_data_bymid['Auth'])
- self.assertEqual(n, len(list_auth))
- # check consolidation of statistics data of the auth
- # instances
- self.assertEqual(
- stat.statistics_data['Auth'],
- {'queries.perzone': [
- {'zonename':
- auth.queries_per_zone[0]['zonename'],
- 'queries.tcp':
- auth.queries_per_zone[0]['queries.tcp']*n,
- 'queries.udp':
- auth.queries_per_zone[0]['queries.udp']*n},
- {'zonename': "test2.example",
- 'queries.tcp': 4*n,
- 'queries.udp': 3*n },
- ],
- 'nds_queries.perzone': {
- 'test10.example': {
- 'queries.tcp':
- auth.nds_queries_per_zone['test10.example']['queries.tcp']*n,
- 'queries.udp':
- auth.nds_queries_per_zone['test10.example']['queries.udp']*n},
- 'test20.example': {
- 'queries.tcp':
- 4*n,
- 'queries.udp':
- 3*n},
- },
- 'queries.tcp': auth.queries_tcp*n,
- 'queries.udp': auth.queries_udp*n})
- # check statistics data of 'Stats'
- self.assertEqual(
- len(stat.statistics_data['Stats']), 5)
- self.assertTrue('boot_time' in
- stat.statistics_data['Stats'])
- self.assertTrue('last_update_time' in
- stat.statistics_data['Stats'])
- self.assertTrue('report_time' in
- stat.statistics_data['Stats'])
- self.assertTrue('timestamp' in
- stat.statistics_data['Stats'])
- self.assertEqual(
- stat.statistics_data['Stats']['lname'],
- stat.mccs._session.lname)
- stats_server.shutdown()
-
- def test_polling2(self):
- # set invalid statistics
- b10_init = self.base.b10_init.server
- b10_init.statistics_data = {'boot_time':1}
- stats_server = ThreadingServerManager(MyStats)
- stat = stats_server.server
- stats_server.run()
- self.assertEqual(
- send_command('status', 'Stats'),
- (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
- # check default statistics data of 'Init'
- self.assertEqual(
- stat.statistics_data['Init'],
- {'boot_time': self.const_default_datetime})
- stats_server.shutdown()
-
-class TestOSEnv(unittest.TestCase):
- def test_osenv(self):
- """
- test for the environ variable "B10_FROM_SOURCE"
- "B10_FROM_SOURCE" is set in Makefile
- """
- # test case having B10_FROM_SOURCE
- self.assertTrue("B10_FROM_SOURCE" in os.environ)
- self.assertEqual(stats.SPECFILE_LOCATION, \
- os.environ["B10_FROM_SOURCE"] + os.sep + \
- "src" + os.sep + "bin" + os.sep + "stats" + \
- os.sep + "stats.spec")
- # test case not having B10_FROM_SOURCE
- path = os.environ["B10_FROM_SOURCE"]
- os.environ.pop("B10_FROM_SOURCE")
- self.assertFalse("B10_FROM_SOURCE" in os.environ)
- # import stats again
- imp.reload(stats)
- # revert the changes
- os.environ["B10_FROM_SOURCE"] = path
- imp.reload(stats)
-
-if __name__ == "__main__":
- isc.log.resetUnitTestRootLogger()
- unittest.main()
diff --git a/src/bin/stats/tests/stats-httpd_test.py b/src/bin/stats/tests/stats-httpd_test.py
new file mode 100644
index 0000000..84fac44
--- /dev/null
+++ b/src/bin/stats/tests/stats-httpd_test.py
@@ -0,0 +1,1127 @@
+# Copyright (C) 2011-2012 Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+In each of these tests we start several virtual components. They are
+not the real components, no external processes are started. They are
+just simple mock objects running each in its own thread and pretending
+to be bind10 modules. This helps testing the stats http server in a
+close to real environment.
+"""
+
+import unittest
+import os
+import imp
+import socket
+import errno
+import select
+import string
+import time
+import threading
+import http.client
+import xml.etree.ElementTree
+import random
+import urllib.parse
+import sys
+# load this module for xml validation with xsd. For this test, an
+# installation of lxml is required in advance. See http://lxml.de/.
+try:
+ from lxml import etree as lxml_etree
+except ImportError:
+ lxml_etree = None
+
+import isc
+import isc.log
+import stats_httpd
+import stats
+from test_utils import ThreadingServerManager, SignalHandler, \
+ MyStatsHttpd, CONST_BASETIME
+from isc.testutils.ccsession_mock import MockModuleCCSession
+from isc.config import RPCRecipientMissing, RPCError
+
+# This test suite uses xml.etree.ElementTree.XMLParser via
+# xml.etree.ElementTree.parse. On the platform where expat isn't
+# installed, ImportError is raised and it's failed. Check expat is
+# available before the test invocation. Skip this test if it's
+# unavailable.
+try:
+ # ImportError raised if xpat is unavailable
+ xml_parser = xml.etree.ElementTree.XMLParser()
+except ImportError:
+ xml_parser = None
+
+# set XML Namespaces for testing
+XMLNS_XSL = "http://www.w3.org/1999/XSL/Transform"
+XMLNS_XHTML = "http://www.w3.org/1999/xhtml"
+XMLNS_XSD = "http://www.w3.org/2001/XMLSchema"
+XMLNS_XSI = stats_httpd.XMLNS_XSI
+
+DUMMY_DATA = {
+ 'Init' : {
+ "boot_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME)
+ },
+ 'Auth' : {
+ "queries.tcp": 6,
+ "queries.udp": 4,
+ "queries.perzone": [{
+ "zonename": "test1.example",
+ "queries.tcp": 10,
+ "queries.udp": 8
+ }, {
+ "zonename": "test2.example",
+ "queries.tcp": 8,
+ "queries.udp": 6
+ }],
+ "nds_queries.perzone": {
+ "test10.example": {
+ "queries.tcp": 10,
+ "queries.udp": 8
+ },
+ "test20.example": {
+ "queries.tcp": 8,
+ "queries.udp": 6
+ }
+ }
+ },
+ 'Stats' : {
+ "report_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ "boot_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ "last_update_time": time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ "lname": "4d70d40a_c at host",
+ "timestamp": time.mktime(CONST_BASETIME)
+ }
+ }
+
+# Bad practice: this should be localized
+stats._BASETIME = CONST_BASETIME
+stats.get_timestamp = lambda: time.mktime(CONST_BASETIME)
+stats.get_datetime = lambda x=None: time.strftime("%Y-%m-%dT%H:%M:%SZ", CONST_BASETIME)
+
+def get_availaddr(address='127.0.0.1', port=8001):
+ """returns a tuple of address and port which is available to
+ listen on the platform. The first argument is a address for
+ search. The second argument is a port for search. If a set of
+ address and port is failed on the search for the availability, the
+ port number is increased and it goes on the next trial until the
+ available set of address and port is looked up. If the port number
+ reaches over 65535, it may stop the search and raise a
+ OverflowError exception."""
+ while True:
+ for addr in socket.getaddrinfo(
+ address, port, 0,
+ socket.SOCK_STREAM, socket.IPPROTO_TCP):
+ sock = socket.socket(addr[0], socket.SOCK_STREAM)
+ try:
+ sock.bind((address, port))
+ return (address, port)
+ except socket.error:
+ continue
+ finally:
+ if sock: sock.close()
+ # This address and port number are already in use.
+ # next port number is added
+ port = port + 1
+
+def is_ipv6_enabled(address='::1', port=8001):
+ """checks IPv6 enabled on the platform. address for check is '::1'
+ and port for check is random number between 8001 and
+ 65535. Retrying is 3 times even if it fails. The built-in socket
+ module provides a 'has_ipv6' parameter, but it's not used here
+ because there may be a situation where the value is True on an
+ environment where the IPv6 config is disabled."""
+ for p in random.sample(range(port, 65535), 3):
+ try:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.bind((address, p))
+ return True
+ except socket.error:
+ continue
+ finally:
+ if sock: sock.close()
+ return False
+
+class TestItemNameList(unittest.TestCase):
+
+ def test_item_name_list(self):
+ # for a one-element list
+ self.assertEqual(['a'],
+ stats_httpd.item_name_list({'a':1}, 'a'))
+ # for a dict under a dict
+ self.assertEqual(['a','a/b'],
+ stats_httpd.item_name_list({'a':{'b':1}}, 'a'))
+ self.assertEqual(['a/b'],
+ stats_httpd.item_name_list({'a':{'b':1}}, 'a/b'))
+ self.assertEqual(['a','a/b','a/b/c'],
+ stats_httpd.item_name_list({'a':{'b':{'c':1}}}, 'a'))
+ self.assertEqual(['a/b','a/b/c'],
+ stats_httpd.item_name_list({'a':{'b':{'c':1}}},
+ 'a/b'))
+ self.assertEqual(['a/b/c'],
+ stats_httpd.item_name_list({'a':{'b':{'c':1}}},
+ 'a/b/c'))
+ # for a list under a dict
+ self.assertEqual(['a[2]'],
+ stats_httpd.item_name_list({'a':[1,2,3]}, 'a[2]'))
+ self.assertEqual(['a', 'a[0]', 'a[1]', 'a[2]'],
+ stats_httpd.item_name_list({'a':[1,2,3]}, 'a'))
+ self.assertEqual(['a', 'a[0]', 'a[1]', 'a[2]'],
+ stats_httpd.item_name_list({'a':[1,2,3]}, ''))
+ # for a list under a dict under a dict
+ self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
+ stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, 'a'))
+ self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
+ stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, ''))
+ self.assertEqual(['a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
+ stats_httpd.item_name_list({'a':{'b':[1,2,3]}}, 'a/b'))
+ # for a mixed case of the above
+ self.assertEqual(['a', 'a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]', 'a/c'],
+ stats_httpd.item_name_list(
+ {'a':{'b':[1,2,3], 'c':1}}, 'a'))
+ self.assertEqual(['a/b', 'a/b[0]', 'a/b[1]', 'a/b[2]'],
+ stats_httpd.item_name_list(
+ {'a':{'b':[1,2,3], 'c':1}}, 'a/b'))
+ self.assertEqual(['a/c'],
+ stats_httpd.item_name_list(
+ {'a':{'b':[1,2,3], 'c':1}}, 'a/c'))
+ # for specifying a wrong identifier which is not found in
+ # element
+ self.assertRaises(isc.cc.data.DataNotFoundError,
+ stats_httpd.item_name_list, {'x':1}, 'a')
+ # for specifying a string in element and an empty string in
+ # identifier
+ self.assertEqual([],
+ stats_httpd.item_name_list('a', ''))
+ # for specifying empty strings in element and identifier
+ self.assertEqual([],
+ stats_httpd.item_name_list('', ''))
+ # for specifying wrong element, which is an non-empty string,
+ # and an non-empty string in identifier
+ self.assertRaises(isc.cc.data.DataTypeError,
+ stats_httpd.item_name_list, 'a', 'a')
+ # for specifying None in element and identifier
+ self.assertRaises(isc.cc.data.DataTypeError,
+ stats_httpd.item_name_list, None, None)
+ # for specifying non-dict in element
+ self.assertRaises(isc.cc.data.DataTypeError,
+ stats_httpd.item_name_list, [1,2,3], 'a')
+ self.assertRaises(isc.cc.data.DataTypeError,
+ stats_httpd.item_name_list, [1,2,3], '')
+ # for checking key names sorted which consist of element
+ num = 11
+ keys = [ 'a', 'aa', 'b' ]
+ keys.sort(reverse=True)
+ dictlist = dict([ (k, list(range(num))) for k in keys ])
+ keys.sort()
+ ans = []
+ for k in keys:
+ ans += [k] + [ '%s[%d]' % (k, i) for i in range(num) ]
+ self.assertEqual(ans,
+ stats_httpd.item_name_list(dictlist, ''))
+
+class TestHttpHandler(unittest.TestCase):
+ """Tests for HttpHandler class"""
+ def setUp(self):
+ # set the signal handler for deadlock
+ self.sig_handler = SignalHandler(self.fail)
+ DUMMY_DATA['Stats']['lname'] = 'test-lname'
+ (self.address, self.port) = get_availaddr()
+ self.stats_httpd_server = ThreadingServerManager(MyStatsHttpd,
+ (self.address,
+ self.port))
+ self.stats_httpd = self.stats_httpd_server.server
+ self.stats_httpd_server.run()
+ self.client = http.client.HTTPConnection(self.address, self.port)
+ self.client._http_vsn_str = 'HTTP/1.0\n'
+ self.client.connect()
+
+ def tearDown(self):
+ self.client.close()
+ # reset the signal handler
+ self.sig_handler.reset()
+
+ @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
+ def test_do_GET(self):
+ self.assertTrue(type(self.stats_httpd.httpd) is list)
+ self.assertEqual(len(self.stats_httpd.httpd), 1)
+ self.assertEqual((self.address, self.port), self.stats_httpd.http_addrs[0])
+
+ def check_XML_URL_PATH(path=''):
+ url_path = '%s/%s' % (stats_httpd.XML_URL_PATH, path)
+ url_path = urllib.parse.quote(url_path)
+ self.client.putrequest('GET', url_path)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.getheader("Content-type"), "text/xml")
+ self.assertGreater(int(response.getheader("Content-Length")), 0)
+ self.assertEqual(response.status, 200)
+ xml_doctype = response.readline().decode()
+ xsl_doctype = response.readline().decode()
+ self.assertGreater(len(xml_doctype), 0)
+ self.assertGreater(len(xsl_doctype), 0)
+ root = xml.etree.ElementTree.parse(response).getroot()
+ self.assertGreater(root.tag.find('statistics'), 0)
+ schema_loc = '{%s}schemaLocation' % XMLNS_XSI
+ # check the path of XSD
+ self.assertEqual(root.attrib[schema_loc],
+ stats_httpd.XSD_NAMESPACE + ' '
+ + stats_httpd.XSD_URL_PATH)
+ # check the path of XSL
+ self.assertTrue(xsl_doctype.startswith(
+ '<?xml-stylesheet type="text/xsl" href="' +
+ stats_httpd.XSL_URL_PATH
+ + '"?>'))
+ # check whether the list of 'identifier' attributes in
+ # root is same as the list of item names in DUMMY_DATA
+ id_list = [ elm.attrib['identifier'] for elm in root ]
+ item_list = [ it for it in \
+ stats_httpd.item_name_list(DUMMY_DATA, path) \
+ if len(it.split('/')) > 1 ]
+ self.assertEqual(id_list, item_list)
+ for elem in root:
+ attr = elem.attrib
+ value = isc.cc.data.find(DUMMY_DATA, attr['identifier'])
+ # No 'value' attribute should be found in the 'item'
+ # element when datatype of the value is list or dict.
+ if type(value) is list or type(value) is dict:
+ self.assertFalse('value' in attr)
+ # The value of the 'value' attribute should be checked
+ # after casting it to string type if datatype of the
+ # value is int or float. Because attr['value'] returns
+ # string type even if its value is int or float.
+ elif type(value) is int or type(value) is float:
+ self.assertEqual(attr['value'], str(value))
+ else:
+ self.assertEqual(attr['value'], value)
+
+ # URL is '/bind10/statistics/xml'
+ check_XML_URL_PATH()
+ for path in stats_httpd.item_name_list(DUMMY_DATA, ''):
+ check_XML_URL_PATH(path)
+
+ def check_XSD_URL_PATH():
+ url_path = stats_httpd.XSD_URL_PATH
+ url_path = urllib.parse.quote(url_path)
+ self.client.putrequest('GET', url_path)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.getheader("Content-type"), "text/xml")
+ self.assertGreater(int(response.getheader("Content-Length")), 0)
+ self.assertEqual(response.status, 200)
+ root = xml.etree.ElementTree.parse(response).getroot()
+ url_xmlschema = '{%s}' % XMLNS_XSD
+ self.assertGreater(root.tag.find('schema'), 0)
+ self.assertTrue(hasattr(root, 'attrib'))
+ self.assertTrue('targetNamespace' in root.attrib)
+ self.assertEqual(root.attrib['targetNamespace'],
+ stats_httpd.XSD_NAMESPACE)
+
+ # URL is '/bind10/statistics/xsd'
+ check_XSD_URL_PATH()
+
+ def check_XSL_URL_PATH():
+ url_path = stats_httpd.XSL_URL_PATH
+ url_path = urllib.parse.quote(url_path)
+ self.client.putrequest('GET', url_path)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.getheader("Content-type"), "text/xml")
+ self.assertGreater(int(response.getheader("Content-Length")), 0)
+ self.assertEqual(response.status, 200)
+ root = xml.etree.ElementTree.parse(response).getroot()
+ url_trans = '{%s}' % XMLNS_XSL
+ url_xhtml = '{%s}' % XMLNS_XHTML
+ self.assertEqual(root.tag, url_trans + 'stylesheet')
+
+ # URL is '/bind10/statistics/xsl'
+ check_XSL_URL_PATH()
+
+ # 302 redirect
+ self.client._http_vsn_str = 'HTTP/1.1'
+ self.client.putrequest('GET', '/')
+ self.client.putheader('Host', self.address)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 302)
+ self.assertEqual(response.getheader('Location'),
+ "http://%s:%d%s/" % (self.address, self.port, stats_httpd.XML_URL_PATH))
+
+ # 404 NotFound (random path)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', '/path/to/foo/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', '/bind10/foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', '/bind10/statistics/foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + 'Auth') # with no slash
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ # 200 ok
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/#foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/?foo=bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ # 404 NotFound (too long path)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Init/boot_time/a')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ # 404 NotFound (nonexistent module name)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Foo')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ # 404 NotFound (nonexistent item name)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Foo/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Foo/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Foo/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ # 404 NotFound (existent module but nonexistent item name)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/Auth/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSD_URL_PATH + '/Auth/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+ self.client._http_vsn_str = 'HTTP/1.0'
+ self.client.putrequest('GET', stats_httpd.XSL_URL_PATH + '/Auth/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ def test_do_GET_failed1(self):
+ # failure case (Stats is down, so rpc_call() results in an exception)
+ # Note: this should eventually be RPCRecipientMissing.
+ self.stats_httpd._rpc_answers.append(
+ isc.cc.session.SessionTimeout('timeout'))
+
+ # request XML
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 500)
+
+ # request XSD
+ self.client.putrequest('GET', stats_httpd.XSD_URL_PATH)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ # request XSL
+ self.client.putrequest('GET', stats_httpd.XSL_URL_PATH)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ def test_do_GET_failed2(self):
+ # failure case(Stats replies an error)
+ self.stats_httpd._rpc_answers.append(
+ RPCError(1, "specified arguments are incorrect: I have an error."))
+
+ # request XML
+ self.client.putrequest('GET', stats_httpd.XML_URL_PATH + '/')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ # request XSD
+ self.stats_httpd._rpc_answers.append(
+ RPCError(1, "specified arguments are incorrect: I have an error."))
+ self.client.putrequest('GET', stats_httpd.XSD_URL_PATH)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ # request XSL
+ self.stats_httpd._rpc_answers.append(
+ RPCError(1, "specified arguments are incorrect: I have an error."))
+ self.client.putrequest('GET', stats_httpd.XSL_URL_PATH)
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ def test_do_HEAD(self):
+ self.client.putrequest('HEAD', stats_httpd.XML_URL_PATH + '/')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 200)
+
+ self.client.putrequest('HEAD', '/path/to/foo/bar')
+ self.client.endheaders()
+ response = self.client.getresponse()
+ self.assertEqual(response.status, 404)
+
+ @unittest.skipUnless(lxml_etree, "skipping XML validation with XSD")
+ def test_xml_validation_with_xsd(self):
+ """Tests for XML validation with XSD. If lxml is not
+ installed, this tests would be skipped."""
+ def request_xsd():
+ url_path = stats_httpd.XSD_URL_PATH
+ url_path = urllib.parse.quote(url_path)
+ self.client.putrequest('GET', url_path)
+ self.client.endheaders()
+ xsd_doc = self.client.getresponse()
+ xsd_doc = lxml_etree.parse(xsd_doc)
+ return lxml_etree.XMLSchema(xsd_doc)
+
+ def request_xmldoc(path=''):
+ url_path = '%s/%s' % (stats_httpd.XML_URL_PATH, path)
+ url_path = urllib.parse.quote(url_path)
+ self.client.putrequest('GET', url_path)
+ self.client.endheaders()
+ xml_doc = self.client.getresponse()
+ return lxml_etree.parse(xml_doc)
+
+ # request XSD and XML
+ xsd = request_xsd()
+ xml_doc = request_xmldoc()
+ # do validation
+ self.assertTrue(xsd.validate(xml_doc))
+
+ # validate each paths in DUMMY_DATA
+ for path in stats_httpd.item_name_list(DUMMY_DATA, ''):
+ # request XML
+ xml_doc = request_xmldoc(path)
+ # do validation
+ self.assertTrue(xsd.validate(xml_doc))
+
+class TestHttpServerError(unittest.TestCase):
+ """Tests for HttpServerError exception"""
+ def test_raises(self):
+ try:
+ raise stats_httpd.HttpServerError('Nothing')
+ except stats_httpd.HttpServerError as err:
+ self.assertEqual(str(err), 'Nothing')
+
+class TestHttpServer(unittest.TestCase):
+ """Tests for HttpServer class"""
+ def setUp(self):
+ # set the signal handler for deadlock
+ self.sig_handler = SignalHandler(self.fail)
+
+ def tearDown(self):
+ if hasattr(self, "stats_httpd"):
+ self.stats_httpd.stop()
+ # reset the signal handler
+ self.sig_handler.reset()
+
+ def test_httpserver(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertEqual(type(self.stats_httpd.httpd), list)
+ self.assertEqual(len(self.stats_httpd.httpd), 1)
+ for httpd in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(httpd, stats_httpd.HttpServer))
+
+class TestStatsHttpdError(unittest.TestCase):
+ """Tests for StatsHttpdError exception"""
+
+ def test_raises1(self):
+ try:
+ raise stats_httpd.StatsHttpdError('Nothing')
+ except stats_httpd.StatsHttpdError as err:
+ self.assertEqual(str(err), 'Nothing')
+
+ def test_raises2(self):
+ try:
+ raise stats_httpd.StatsHttpdDataError('Nothing')
+ except stats_httpd.StatsHttpdDataError as err:
+ self.assertEqual(str(err), 'Nothing')
+
+class TestStatsHttpd(unittest.TestCase):
+ """Tests for StatsHttpd class"""
+
+ def setUp(self):
+ # set the signal handler for deadlock
+ self.sig_handler = SignalHandler(self.fail)
+ # checking IPv6 enabled on this platform
+ self.ipv6_enabled = is_ipv6_enabled()
+ # instantiation of StatsHttpd indirectly calls gethostbyaddr(), which
+ # can block for an uncontrollable period, leading many undesirable
+ # results. We should rather eliminate the reliance, but until we
+ # can make such fundamental cleanup we replace it with a faked method;
+ # in our test scenario the return value doesn't matter.
+ self.__gethostbyaddr_orig = socket.gethostbyaddr
+ socket.gethostbyaddr = lambda x: ('test.example.', [], None)
+
+ # Some tests replace this library function. Keep the original for
+ # restor
+ self.__orig_select_select = select.select
+
+ def tearDown(self):
+ socket.gethostbyaddr = self.__gethostbyaddr_orig
+ if hasattr(self, "stats_httpd"):
+ self.stats_httpd.stop()
+ # reset the signal handler
+ self.sig_handler.reset()
+
+ # restore original of replaced library
+ select.select = self.__orig_select_select
+
+ def test_init(self):
+ server_address = get_availaddr()
+ self.stats_httpd = MyStatsHttpd(server_address)
+ self.assertEqual(self.stats_httpd.running, False)
+ self.assertEqual(self.stats_httpd.poll_intval, 0.5)
+ self.assertNotEqual(len(self.stats_httpd.httpd), 0)
+ self.assertIsNotNone(self.stats_httpd.mccs)
+ self.assertIsNotNone(self.stats_httpd.cc_session)
+ # The real CfgMgr would return 'version', but our test mock omits it,
+ # so the len(config) should be 1
+ self.assertEqual(len(self.stats_httpd.config), 1)
+ self.assertTrue('listen_on' in self.stats_httpd.config)
+ self.assertEqual(len(self.stats_httpd.config['listen_on']), 1)
+ self.assertTrue('address' in self.stats_httpd.config['listen_on'][0])
+ self.assertTrue('port' in self.stats_httpd.config['listen_on'][0])
+ self.assertTrue(server_address in set(self.stats_httpd.http_addrs))
+ self.assertEqual('StatsHttpd', self.stats_httpd.mccs.\
+ get_module_spec().get_module_name())
+
+ def test_init_hterr(self):
+ """Test the behavior of StatsHttpd constructor when open_httpd fails.
+
+ We specifically check the following two:
+ - close_mccs() is called (so stats-httpd tells ConfigMgr it's shutting
+ down)
+ - the constructor results in HttpServerError exception.
+
+ """
+ self.__mccs_closed = False
+ def call_checker():
+ self.__mccs_closed = True
+ class FailingStatsHttpd(MyStatsHttpd):
+ def open_httpd(self):
+ raise stats_httpd.HttpServerError
+ def close_mccs(self):
+ call_checker()
+ self.assertRaises(stats_httpd.HttpServerError, FailingStatsHttpd)
+ self.assertTrue(self.__mccs_closed)
+
+ def test_openclose_mccs(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ mccs = self.stats_httpd.mccs
+ self.assertFalse(self.stats_httpd.mccs.stopped)
+ self.assertFalse(self.stats_httpd.mccs.closed)
+ self.stats_httpd.close_mccs()
+ self.assertTrue(mccs.stopped)
+ self.assertTrue(mccs.closed)
+ self.assertIsNone(self.stats_httpd.mccs)
+ self.stats_httpd.open_mccs()
+ self.assertIsNotNone(self.stats_httpd.mccs)
+ self.stats_httpd.mccs = None
+ self.assertIsNone(self.stats_httpd.mccs)
+ self.assertIsNone(self.stats_httpd.close_mccs())
+
+ def test_mccs(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertIsNotNone(self.stats_httpd.mccs.get_socket())
+ self.assertTrue(
+ isinstance(self.stats_httpd.mccs.get_socket(), socket.socket))
+ self.assertIsNotNone(self.stats_httpd.cc_session)
+ statistics_spec = self.stats_httpd.get_stats_spec()
+ for mod in DUMMY_DATA:
+ self.assertTrue(mod in statistics_spec)
+ for cfg in statistics_spec[mod]:
+ self.assertTrue('item_name' in cfg)
+ self.assertTrue(cfg['item_name'] in DUMMY_DATA[mod])
+ self.assertTrue(len(statistics_spec[mod]), len(DUMMY_DATA[mod]))
+ self.stats_httpd.close_mccs()
+ self.assertIsNone(self.stats_httpd.mccs)
+
+ def test_httpd(self):
+ # dual stack (addresses is ipv4 and ipv6)
+ if self.ipv6_enabled:
+ server_addresses = (get_availaddr('::1'), get_availaddr())
+ self.stats_httpd = MyStatsHttpd(*server_addresses)
+ for ht in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
+ self.assertTrue(ht.address_family in set([socket.AF_INET,
+ socket.AF_INET6]))
+ self.assertTrue(isinstance(ht.socket, socket.socket))
+ ht.socket.close() # to silence warning about resource leak
+ self.stats_httpd.close_mccs() # ditto
+
+ # dual stack (address is ipv6)
+ if self.ipv6_enabled:
+ server_addresses = get_availaddr('::1')
+ self.stats_httpd = MyStatsHttpd(server_addresses)
+ for ht in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
+ self.assertEqual(ht.address_family, socket.AF_INET6)
+ self.assertTrue(isinstance(ht.socket, socket.socket))
+ ht.socket.close()
+ self.stats_httpd.close_mccs() # ditto
+
+ # dual/single stack (address is ipv4)
+ server_addresses = get_availaddr()
+ self.stats_httpd = MyStatsHttpd(server_addresses)
+ for ht in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
+ self.assertEqual(ht.address_family, socket.AF_INET)
+ self.assertTrue(isinstance(ht.socket, socket.socket))
+ ht.socket.close()
+ self.stats_httpd.close_mccs()
+
+ def test_httpd_anyIPv4(self):
+ # any address (IPv4)
+ server_addresses = get_availaddr(address='0.0.0.0')
+ self.stats_httpd = MyStatsHttpd(server_addresses)
+ for ht in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
+ self.assertEqual(ht.address_family,socket.AF_INET)
+ self.assertTrue(isinstance(ht.socket, socket.socket))
+
+ def test_httpd_anyIPv6(self):
+ # any address (IPv6)
+ if self.ipv6_enabled:
+ server_addresses = get_availaddr(address='::')
+ self.stats_httpd = MyStatsHttpd(server_addresses)
+ for ht in self.stats_httpd.httpd:
+ self.assertTrue(isinstance(ht, stats_httpd.HttpServer))
+ self.assertEqual(ht.address_family,socket.AF_INET6)
+ self.assertTrue(isinstance(ht.socket, socket.socket))
+
+ def test_httpd_failed(self):
+ # existent hostname
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ get_availaddr(address='localhost'))
+
+ # nonexistent hostname
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ ('my.host.domain', 8000))
+
+ # over flow of port number
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ ('127.0.0.1', 80000))
+
+ # negative
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ ('127.0.0.1', -8000))
+
+ # alphabet
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ ('127.0.0.1', 'ABCDE'))
+
+ # Address already in use
+ server_addresses = get_availaddr()
+ server = MyStatsHttpd(server_addresses)
+ self.assertRaises(stats_httpd.HttpServerError, MyStatsHttpd,
+ server_addresses)
+
+ def __faked_select(self, ex=None):
+ """A helper subroutine for tests using faked select.select.
+
+ See test_running() for basic features. If ex is not None,
+ it's assumed to be an exception object and will be raised on the
+ first call.
+
+ """
+ self.assertTrue(self.stats_httpd.running)
+ self.__call_count += 1
+ if ex is not None and self.__call_count == 1:
+ raise ex
+ if self.__call_count == 2:
+ self.stats_httpd.running = False
+ assert self.__call_count <= 2 # safety net to avoid infinite loop
+ return ([], [], [])
+
+ def test_running(self):
+ # Previous version of this test checks the result of "status" and
+ # "shutdown" commands; however, they are more explicitly tested
+ # in specific tests. In this test we only have to check:
+ # - start() will set 'running' to True
+ # - as long as 'running' is True, it keeps calling select.select
+ # - when running becomes False, it exists from the loop and calls
+ # stop()
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertFalse(self.stats_httpd.running)
+
+ # In this test we'll call select.select() 2 times: on the first call
+ # stats_httpd.running should be True; on the second call the faked
+ # select() will set it to False.
+ self.__call_count = 0
+ select.select = lambda r, w, x, t: self.__faked_select()
+ self.stats_httpd.start()
+ self.assertFalse(self.stats_httpd.running)
+ self.assertIsNone(self.stats_httpd.mccs) # stop() clears .mccs
+
+ def test_running_fail(self):
+ # A failure case of start(): we close the (real but dummy) socket for
+ # the CC session. This breaks the select-loop due to exception
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.stats_httpd.mccs.get_socket().close()
+ self.assertRaises(ValueError, self.stats_httpd.start)
+
+ def test_failure_with_a_select_error (self):
+ """checks select.error is raised if the exception except
+ errno.EINTR is raised while it's selecting"""
+ def raise_select_except(*args):
+ raise select.error('dummy error')
+ select.select = raise_select_except
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertRaises(select.error, self.stats_httpd.start)
+
+ def test_nofailure_with_errno_EINTR(self):
+ """checks no exception is raised if errno.EINTR is raised
+ while it's selecting"""
+ self.__call_count = 0
+ select.select = lambda r, w, x, t: self.__faked_select(
+ select.error(errno.EINTR))
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.stats_httpd.start() # shouldn't leak the exception
+ self.assertFalse(self.stats_httpd.running)
+ self.assertIsNone(self.stats_httpd.mccs)
+
+ def test_open_template(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ # successful conditions
+ tmpl = self.stats_httpd.open_template(
+ stats_httpd.XML_TEMPLATE_LOCATION)
+ self.assertTrue(isinstance(tmpl, string.Template))
+ opts = dict(
+ xml_string="<dummy></dummy>",
+ xsl_url_path="/path/to/")
+ lines = tmpl.substitute(opts)
+ for n in opts:
+ self.assertGreater(lines.find(opts[n]), 0)
+ tmpl = self.stats_httpd.open_template(
+ stats_httpd.XSD_TEMPLATE_LOCATION)
+ self.assertTrue(isinstance(tmpl, string.Template))
+ opts = dict(xsd_namespace="http://host/path/to/")
+ lines = tmpl.substitute(opts)
+ for n in opts:
+ self.assertGreater(lines.find(opts[n]), 0)
+ tmpl = self.stats_httpd.open_template(
+ stats_httpd.XSL_TEMPLATE_LOCATION)
+ self.assertTrue(isinstance(tmpl, string.Template))
+ opts = dict(xsd_namespace="http://host/path/to/")
+ lines = tmpl.substitute(opts)
+ for n in opts:
+ self.assertGreater(lines.find(opts[n]), 0)
+ # unsuccessful condition
+ self.assertRaises(
+ stats_httpd.StatsHttpdDataError,
+ self.stats_httpd.open_template, '/path/to/foo/bar')
+
+ def test_commands(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertEqual(self.stats_httpd.command_handler("status", None),
+ isc.config.ccsession.create_answer(
+ 0, "Stats Httpd is up. (PID " + str(os.getpid()) + ")"))
+ self.stats_httpd.running = True
+ self.assertEqual(self.stats_httpd.command_handler("shutdown", None),
+ isc.config.ccsession.create_answer(0))
+ self.assertFalse(self.stats_httpd.running)
+ self.assertEqual(
+ self.stats_httpd.command_handler("__UNKNOWN_COMMAND__", None),
+ isc.config.ccsession.create_answer(
+ 1, "Unknown command: __UNKNOWN_COMMAND__"))
+
+ def test_config(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ self.assertEqual(
+ self.stats_httpd.config_handler(dict(_UNKNOWN_KEY_=None)),
+ isc.config.ccsession.create_answer(
+ 1, "unknown item _UNKNOWN_KEY_"))
+
+ addresses = get_availaddr()
+ self.assertEqual(
+ self.stats_httpd.config_handler(
+ dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
+ isc.config.ccsession.create_answer(0))
+ self.assertTrue("listen_on" in self.stats_httpd.config)
+ for addr in self.stats_httpd.config["listen_on"]:
+ self.assertTrue("address" in addr)
+ self.assertTrue("port" in addr)
+ self.assertTrue(addr["address"] == addresses[0])
+ self.assertTrue(addr["port"] == addresses[1])
+
+ if self.ipv6_enabled:
+ addresses = get_availaddr("::1")
+ self.assertEqual(
+ self.stats_httpd.config_handler(
+ dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
+ isc.config.ccsession.create_answer(0))
+ self.assertTrue("listen_on" in self.stats_httpd.config)
+ for addr in self.stats_httpd.config["listen_on"]:
+ self.assertTrue("address" in addr)
+ self.assertTrue("port" in addr)
+ self.assertTrue(addr["address"] == addresses[0])
+ self.assertTrue(addr["port"] == addresses[1])
+
+ addresses = get_availaddr()
+ self.assertEqual(
+ self.stats_httpd.config_handler(
+ dict(listen_on=[dict(address=addresses[0],port=addresses[1])])),
+ isc.config.ccsession.create_answer(0))
+ self.assertTrue("listen_on" in self.stats_httpd.config)
+ for addr in self.stats_httpd.config["listen_on"]:
+ self.assertTrue("address" in addr)
+ self.assertTrue("port" in addr)
+ self.assertTrue(addr["address"] == addresses[0])
+ self.assertTrue(addr["port"] == addresses[1])
+ (ret, arg) = isc.config.ccsession.parse_answer(
+ self.stats_httpd.config_handler(
+ dict(listen_on=[dict(address="1.2.3.4",port=543210)]))
+ )
+ self.assertEqual(ret, 1)
+
+ @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
+ def test_xml_handler(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ module_name = 'Dummy'
+ stats_spec = \
+ { module_name :
+ [{
+ "item_name": "foo",
+ "item_type": "string",
+ "item_optional": False,
+ "item_default": "bar",
+ "item_description": "foo is bar",
+ "item_title": "Foo"
+ },
+ {
+ "item_name": "foo2",
+ "item_type": "list",
+ "item_optional": False,
+ "item_default": [
+ {
+ "zonename" : "test1",
+ "queries.udp" : 1,
+ "queries.tcp" : 2
+ },
+ {
+ "zonename" : "test2",
+ "queries.udp" : 3,
+ "queries.tcp" : 4
+ }
+ ],
+ "item_title": "Foo bar",
+ "item_description": "Foo bar",
+ "list_item_spec": {
+ "item_name": "foo2-1",
+ "item_type": "map",
+ "item_optional": False,
+ "item_default": {},
+ "map_item_spec": [
+ {
+ "item_name": "foo2-1-1",
+ "item_type": "string",
+ "item_optional": False,
+ "item_default": "",
+ "item_title": "Foo2 1 1",
+ "item_description": "Foo bar"
+ },
+ {
+ "item_name": "foo2-1-2",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Foo2 1 2",
+ "item_description": "Foo bar"
+ },
+ {
+ "item_name": "foo2-1-3",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Foo2 1 3",
+ "item_description": "Foo bar"
+ }
+ ]
+ }
+ }]
+ }
+ stats_data = \
+ { module_name : { 'foo':'bar',
+ 'foo2': [
+ {
+ "foo2-1-1" : "bar1",
+ "foo2-1-2" : 10,
+ "foo2-1-3" : 9
+ },
+ {
+ "foo2-1-1" : "bar2",
+ "foo2-1-2" : 8,
+ "foo2-1-3" : 7
+ }
+ ] } }
+ self.stats_httpd.get_stats_spec = lambda x,y: stats_spec
+ self.stats_httpd.get_stats_data = lambda x,y: stats_data
+ xml_string = self.stats_httpd.xml_handler()
+ stats_xml = xml.etree.ElementTree.fromstring(xml_string)
+ schema_loc = '{%s}schemaLocation' % XMLNS_XSI
+ self.assertEqual(stats_xml.attrib[schema_loc],
+ stats_httpd.XML_ROOT_ATTRIB['xsi:schemaLocation'])
+ stats_data = stats_data[module_name]
+ stats_spec = stats_spec[module_name]
+ names = stats_httpd.item_name_list(stats_data, '')
+ for i in range(0, len(names)):
+ self.assertEqual('%s/%s' % (module_name, names[i]), stats_xml[i].attrib['identifier'])
+ value = isc.cc.data.find(stats_data, names[i])
+ if type(value) is int:
+ value = str(value)
+ if type(value) is dict or type(value) is list:
+ self.assertFalse('value' in stats_xml[i].attrib)
+ else:
+ self.assertEqual(value, stats_xml[i].attrib['value'])
+ self.assertEqual(module_name, stats_xml[i].attrib['owner'])
+ self.assertEqual(urllib.parse.quote('%s/%s/%s' % (stats_httpd.XML_URL_PATH,
+ module_name, names[i])),
+ stats_xml[i].attrib['uri'])
+ spec = isc.config.find_spec_part(stats_spec, names[i])
+ self.assertEqual(spec['item_name'], stats_xml[i].attrib['name'])
+ self.assertEqual(spec['item_type'], stats_xml[i].attrib['type'])
+ self.assertEqual(spec['item_description'], stats_xml[i].attrib['description'])
+ self.assertEqual(spec['item_title'], stats_xml[i].attrib['title'])
+ self.assertEqual(str(spec['item_optional']).lower(), stats_xml[i].attrib['optional'])
+ default = spec['item_default']
+ if type(default) is int:
+ default = str(default)
+ if type(default) is dict or type(default) is list:
+ self.assertFalse('default' in stats_xml[i].attrib)
+ else:
+ self.assertEqual(default, stats_xml[i].attrib['default'])
+ self.assertFalse('item_format' in spec)
+ self.assertFalse('format' in stats_xml[i].attrib)
+
+ @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
+ def test_xsd_handler(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ xsd_string = self.stats_httpd.xsd_handler()
+ stats_xsd = xml.etree.ElementTree.fromstring(xsd_string)
+ ns = '{%s}' % XMLNS_XSD
+ stats_xsd = stats_xsd[1].find('%scomplexType/%ssequence/%selement' % ((ns,)*3))
+ self.assertEqual('item', stats_xsd.attrib['name'])
+ stats_xsd = stats_xsd.find('%scomplexType' % ns)
+ type_types = ('boolean', 'integer', 'real', 'string', 'map', \
+ 'list', 'named_set', 'any')
+ attribs = [('identifier', 'string', 'required'),
+ ('value', 'string', 'optional'),
+ ('owner', 'string', 'required'),
+ ('uri', 'anyURI', 'required'),
+ ('name', 'string', 'required'),
+ ('type', type_types, 'required'),
+ ('description', 'string', 'optional'),
+ ('title', 'string', 'optional'),
+ ('optional', 'boolean', 'optional'),
+ ('default', 'string', 'optional'),
+ ('format', 'string', 'optional')]
+ for i in range(0, len(attribs)):
+ self.assertEqual(attribs[i][0], stats_xsd[i].attrib['name'])
+ if attribs[i][0] == 'type':
+ stats_xsd_types = \
+ stats_xsd[i].find('%ssimpleType/%srestriction' % \
+ ((ns,)*2))
+ for j in range(0, len(attribs[i][1])):
+ self.assertEqual(attribs[i][1][j], \
+ stats_xsd_types[j].attrib['value'])
+ else:
+ self.assertEqual(attribs[i][1], stats_xsd[i].attrib['type'])
+ self.assertEqual(attribs[i][2], stats_xsd[i].attrib['use'])
+
+ @unittest.skipUnless(xml_parser, "skipping the test using XMLParser")
+ def test_xsl_handler(self):
+ self.stats_httpd = MyStatsHttpd(get_availaddr())
+ xsl_string = self.stats_httpd.xsl_handler()
+ stats_xsl = xml.etree.ElementTree.fromstring(xsl_string)
+ nst = '{%s}' % XMLNS_XSL
+ nsx = '{%s}' % XMLNS_XHTML
+ self.assertEqual("bind10:statistics", stats_xsl[2].attrib['match'])
+ stats_xsl = stats_xsl[2].find('%stable' % nsx)
+ self.assertEqual('item', stats_xsl[1].attrib['select'])
+ stats_xsl = stats_xsl[1].find('%str' % nsx)
+ self.assertEqual('@uri', stats_xsl[0].find(
+ '%selement/%sattribute/%svalue-of' % ((nst,)*3)).attrib['select'])
+ self.assertEqual('@identifier', stats_xsl[0].find(
+ '%selement/%svalue-of' % ((nst,)*2)).attrib['select'])
+ self.assertEqual('@value', stats_xsl[1].find('%sif' % nst).attrib['test'])
+ self.assertEqual('@value', stats_xsl[1].find('%sif/%svalue-of' % ((nst,)*2)).attrib['select'])
+ self.assertEqual('@description', stats_xsl[2].find('%sif' % nst).attrib['test'])
+ self.assertEqual('@description', stats_xsl[2].find('%sif/%svalue-of' % ((nst,)*2)).attrib['select'])
+
+class Z_TestStatsHttpdError(unittest.TestCase):
+ def test_for_without_B10_FROM_SOURCE(self):
+ # Note: this test is sensitive due to its substantial side effect of
+ # reloading. For exmaple, it affects tests that tweak module
+ # attributes (such as test_init_hterr). It also breaks logging
+ # setting for unit tests. To minimize these effects, we use
+ # workaround: make it very likely to run at the end of the tests
+ # by naming the test class "Z_".
+
+ # just lets it go through the code without B10_FROM_SOURCE env
+ # variable
+ if "B10_FROM_SOURCE" in os.environ:
+ tmppath = os.environ["B10_FROM_SOURCE"]
+ os.environ.pop("B10_FROM_SOURCE")
+ imp.reload(stats_httpd)
+ os.environ["B10_FROM_SOURCE"] = tmppath
+ imp.reload(stats_httpd)
+
+if __name__ == "__main__":
+ isc.log.resetUnitTestRootLogger()
+ unittest.main()
diff --git a/src/bin/stats/tests/stats_test.py b/src/bin/stats/tests/stats_test.py
new file mode 100644
index 0000000..437f0a8
--- /dev/null
+++ b/src/bin/stats/tests/stats_test.py
@@ -0,0 +1,1428 @@
+# Copyright (C) 2010, 2011, 2012 Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+In each of these tests we start several virtual components. They are
+not the real components, no external processes are started. They are
+just simple mock objects running each in its own thread and pretending
+to be bind10 modules. This helps testing the stats module in a close
+to real environment.
+"""
+
+import unittest
+import os
+import io
+import time
+import imp
+import sys
+
+import stats
+import isc.log
+from test_utils import MyStats
+
+class TestUtilties(unittest.TestCase):
+ items = [
+ { 'item_name': 'test_int1', 'item_type': 'integer', 'item_default': 12345 },
+ { 'item_name': 'test_real1', 'item_type': 'real', 'item_default': 12345.6789 },
+ { 'item_name': 'test_bool1', 'item_type': 'boolean', 'item_default': True },
+ { 'item_name': 'test_str1', 'item_type': 'string', 'item_default': 'ABCD' },
+ { 'item_name': 'test_list1', 'item_type': 'list', 'item_default': [1,2,3],
+ 'list_item_spec' : { 'item_name': 'number', 'item_type': 'integer' } },
+ { 'item_name': 'test_map1', 'item_type': 'map', 'item_default': {'a':1,'b':2,'c':3},
+ 'map_item_spec' : [ { 'item_name': 'a', 'item_type': 'integer'},
+ { 'item_name': 'b', 'item_type': 'integer'},
+ { 'item_name': 'c', 'item_type': 'integer'} ] },
+ { 'item_name': 'test_int2', 'item_type': 'integer' },
+ { 'item_name': 'test_real2', 'item_type': 'real' },
+ { 'item_name': 'test_bool2', 'item_type': 'boolean' },
+ { 'item_name': 'test_str2', 'item_type': 'string' },
+ { 'item_name': 'test_list2', 'item_type': 'list',
+ 'list_item_spec' : { 'item_name': 'number', 'item_type': 'integer' } },
+ { 'item_name': 'test_map2', 'item_type': 'map',
+ 'map_item_spec' : [ { 'item_name': 'A', 'item_type': 'integer'},
+ { 'item_name': 'B', 'item_type': 'integer'},
+ { 'item_name': 'C', 'item_type': 'integer'} ] },
+ { 'item_name': 'test_none', 'item_type': 'none' },
+ { 'item_name': 'test_list3', 'item_type': 'list', 'item_default': ["one","two","three"],
+ 'list_item_spec' : { 'item_name': 'number', 'item_type': 'string' } },
+ { 'item_name': 'test_map3', 'item_type': 'map', 'item_default': {'a':'one','b':'two','c':'three'},
+ 'map_item_spec' : [ { 'item_name': 'a', 'item_type': 'string'},
+ { 'item_name': 'b', 'item_type': 'string'},
+ { 'item_name': 'c', 'item_type': 'string'} ] },
+ {
+ 'item_name': 'test_named_set',
+ 'item_type': 'named_set',
+ 'item_default': { },
+ 'named_set_item_spec': {
+ 'item_name': 'name',
+ 'item_type': 'map',
+ 'item_default': { },
+ 'map_item_spec': [
+ {
+ 'item_name': 'number1',
+ 'item_type': 'integer'
+ },
+ {
+ 'item_name': 'number2',
+ 'item_type': 'integer'
+ }
+ ]
+ }
+ }
+ ]
+
+ def setUp(self):
+ self.const_timestamp = 1308730448.965706
+ self.const_timetuple = (2011, 6, 22, 8, 14, 8, 2, 173, 0)
+ self.const_datetime = '2011-06-22T08:14:08Z'
+ self.__orig_time = stats.time
+ self.__orig_gmtime = stats.gmtime
+ stats.time = lambda : self.const_timestamp
+ stats.gmtime = lambda : self.const_timetuple
+
+ def tearDown(self):
+ stats.time = self.__orig_time
+ stats.gmtime = self.__orig_gmtime
+
+ def test_get_spec_defaults(self):
+ self.assertEqual(
+ stats.get_spec_defaults(self.items), {
+ 'test_int1' : 12345 ,
+ 'test_real1' : 12345.6789 ,
+ 'test_bool1' : True ,
+ 'test_str1' : 'ABCD' ,
+ 'test_list1' : [1,2,3] ,
+ 'test_map1' : {'a':1,'b':2,'c':3},
+ 'test_int2' : 0 ,
+ 'test_real2' : 0.0,
+ 'test_bool2' : False,
+ 'test_str2' : "",
+ 'test_list2' : [0],
+ 'test_map2' : { 'A' : 0, 'B' : 0, 'C' : 0 },
+ 'test_none' : None,
+ 'test_list3' : [ "one", "two", "three" ],
+ 'test_map3' : { 'a' : 'one', 'b' : 'two', 'c' : 'three' },
+ 'test_named_set' : {} })
+ self.assertEqual(stats.get_spec_defaults(None), {})
+ self.assertRaises(KeyError, stats.get_spec_defaults, [{'item_name':'Foo'}])
+
+ def test_get_timestamp(self):
+ self.assertEqual(stats.get_timestamp(), self.const_timestamp)
+
+ def test_get_datetime(self):
+ self.assertEqual(stats.get_datetime(), self.const_datetime)
+ self.assertNotEqual(stats.get_datetime(
+ (2011, 6, 22, 8, 23, 40, 2, 173, 0)), self.const_datetime)
+
+ def test__accum(self):
+ self.assertEqual(stats._accum(None, None), None)
+ self.assertEqual(stats._accum(None, "b"), "b")
+ self.assertEqual(stats._accum("a", None), "a")
+ self.assertEqual(stats._accum(1, 2), 3)
+ self.assertEqual(stats._accum(0.5, 0.3), 0.8)
+ self.assertEqual(stats._accum('aa','bb'), 'bb')
+ self.assertEqual(stats._accum('1970-01-01T09:00:00Z','2012-08-09T09:33:31Z'),
+ '2012-08-09T09:33:31Z')
+ self.assertEqual(stats._accum(
+ [1, 2, 3], [4, 5]), [5, 7, 3])
+ self.assertEqual(stats._accum(
+ [4, 5], [1, 2, 3]), [5, 7, 3])
+ self.assertEqual(stats._accum(
+ [1, 2, 3], [None, 5, 6]), [1, 7, 9])
+ self.assertEqual(stats._accum(
+ [None, 5, 6], [1, 2, 3]), [1, 7, 9])
+ self.assertEqual(stats._accum(
+ [1, 2, 3], [None, None, None, None]), [1,2,3,None])
+ self.assertEqual(stats._accum(
+ [[1,2],3],[[],5,6]), [[1,2],8,6])
+ self.assertEqual(stats._accum(
+ {'one': 1, 'two': 2, 'three': 3},
+ {'one': 4, 'two': 5}),
+ {'one': 5, 'two': 7, 'three': 3})
+ self.assertEqual(stats._accum(
+ {'one': 1, 'two': 2, 'three': 3},
+ {'four': 4, 'five': 5}),
+ {'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5})
+ self.assertEqual(stats._accum(
+ {'one': [1, 2], 'two': [3, None, 5], 'three': [None, 3, None]},
+ {'one': [2], 'two': [4, 5], 'three': [None, None, None], 'four': 'FOUR'}),
+ {'one':[3,2], 'two':[7,5,5], 'three':[None,3,None], 'four': 'FOUR'})
+ self.assertEqual(stats._accum(
+ [ {'one': 1, 'two': 2, 'three': 3}, {'four': 4, 'five': 5, 'six': 6} ],
+ [ {}, {'four': 1, 'five': 2, 'six': 3} ]),
+ [ {'one': 1, 'two': 2, 'three': 3}, {'four': 5, 'five': 7, 'six': 9} ])
+
+ def test_merge_oldnre(self):
+ self.assertEqual(stats.merge_oldnew(1, 2), 2)
+ self.assertEqual(stats.merge_oldnew(0.5, 0.3), 0.3)
+ self.assertEqual(stats.merge_oldnew('aa','bb'), 'bb')
+ self.assertEqual(stats.merge_oldnew(
+ [1, 2, 3], [4, 5]), [4, 5, 3])
+ self.assertEqual(stats.merge_oldnew(
+ [4, 5], [1, 2, 3]), [1, 2, 3])
+ self.assertEqual(stats.merge_oldnew(
+ [1, 2, 3], [None, 5, 6]), [None, 5, 6])
+ self.assertEqual(stats.merge_oldnew(
+ [None, 5, 6], [1, 2, 3]), [1, 2, 3])
+ self.assertEqual(stats.merge_oldnew(
+ [1, 2, 3], [None, None, None, None]), [None, None, None, None])
+ self.assertEqual(stats.merge_oldnew(
+ [[1,2],3],[[],5,6]), [[1,2],5,6])
+ self.assertEqual(stats.merge_oldnew(
+ {'one': 1, 'two': 2, 'three': 3},
+ {'one': 4, 'two': 5}),
+ {'one': 4, 'two': 5, 'three': 3})
+ self.assertEqual(stats.merge_oldnew(
+ {'one': 1, 'two': 2, 'three': 3},
+ {'four': 4, 'five': 5}),
+ {'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5})
+ self.assertEqual(stats.merge_oldnew(
+ {'one': [1, 2], 'two': [3, None, 5], 'three': [None, 3, None]},
+ {'one': [2], 'two': [4, 5], 'three': [None, None, None], 'four': 'FOUR'}),
+ {'one':[2,2], 'two':[4,5,5], 'three':[None,None,None], 'four': 'FOUR'})
+ self.assertEqual(stats.merge_oldnew(
+ [ {'one': 1, 'two': 2, 'three': 3}, {'four': 4, 'five': 5, 'six': 6} ],
+ [ {}, {'four': 1, 'five': 2, 'six': 3} ]),
+ [ {'one': 1, 'two': 2, 'three': 3}, {'four': 1, 'five': 2, 'six': 3} ])
+
+class TestCallback(unittest.TestCase):
+ def setUp(self):
+ self.dummy_func = lambda *x, **y : (x, y)
+ self.dummy_args = (1,2,3)
+ self.dummy_kwargs = {'a':1,'b':2,'c':3}
+ self.cback1 = stats.Callback(
+ command=self.dummy_func,
+ args=self.dummy_args,
+ kwargs=self.dummy_kwargs
+ )
+ self.cback2 = stats.Callback(
+ args=self.dummy_args,
+ kwargs=self.dummy_kwargs
+ )
+ self.cback3 = stats.Callback(
+ command=self.dummy_func,
+ kwargs=self.dummy_kwargs
+ )
+ self.cback4 = stats.Callback(
+ command=self.dummy_func,
+ args=self.dummy_args
+ )
+
+ def test_init(self):
+ self.assertEqual((self.cback1.command, self.cback1.args, self.cback1.kwargs),
+ (self.dummy_func, self.dummy_args, self.dummy_kwargs))
+ self.assertEqual((self.cback2.command, self.cback2.args, self.cback2.kwargs),
+ (None, self.dummy_args, self.dummy_kwargs))
+ self.assertEqual((self.cback3.command, self.cback3.args, self.cback3.kwargs),
+ (self.dummy_func, (), self.dummy_kwargs))
+ self.assertEqual((self.cback4.command, self.cback4.args, self.cback4.kwargs),
+ (self.dummy_func, self.dummy_args, {}))
+
+ def test_call(self):
+ self.assertEqual(self.cback1(), (self.dummy_args, self.dummy_kwargs))
+ self.assertEqual(self.cback1(100, 200), ((100, 200), self.dummy_kwargs))
+ self.assertEqual(self.cback1(a=100, b=200), (self.dummy_args, {'a':100, 'b':200}))
+ self.assertEqual(self.cback2(), None)
+ self.assertEqual(self.cback3(), ((), self.dummy_kwargs))
+ self.assertEqual(self.cback3(100, 200), ((100, 200), self.dummy_kwargs))
+ self.assertEqual(self.cback3(a=100, b=200), ((), {'a':100, 'b':200}))
+ self.assertEqual(self.cback4(), (self.dummy_args, {}))
+ self.assertEqual(self.cback4(100, 200), ((100, 200), {}))
+ self.assertEqual(self.cback4(a=100, b=200), (self.dummy_args, {'a':100, 'b':200}))
+
+class TestStats(unittest.TestCase):
+ def setUp(self):
+ # set the signal handler for deadlock
+ self.const_timestamp = 1308730448.965706
+ self.const_datetime = '2011-06-22T08:14:08Z'
+ self.const_default_datetime = '1970-01-01T00:00:00Z'
+ # Record original module-defined functions in case we replace them
+ self.__orig_timestamp = stats.get_timestamp
+ self.__orig_get_datetime = stats.get_datetime
+
+ def tearDown(self):
+ # restore the stored original function in case we replaced them
+ stats.get_timestamp = self.__orig_timestamp
+ stats.get_datetime = self.__orig_get_datetime
+
+ def test_init(self):
+ self.stats = MyStats()
+ self.assertEqual(self.stats.module_name, 'Stats')
+ self.assertFalse(self.stats.running)
+ self.assertTrue('command_show' in self.stats.callbacks)
+ self.assertTrue('command_status' in self.stats.callbacks)
+ self.assertTrue('command_shutdown' in self.stats.callbacks)
+ self.assertTrue('command_show' in self.stats.callbacks)
+ self.assertTrue('command_showschema' in self.stats.callbacks)
+ self.assertEqual(self.stats.config['poll-interval'], 60)
+
+ def test_init_undefcmd(self):
+ spec_str = """\
+{
+ "module_spec": {
+ "module_name": "Stats",
+ "module_description": "Stats daemon",
+ "config_data": [],
+ "commands": [
+ {
+ "command_name": "_undef_command_",
+ "command_description": "a undefined command in stats",
+ "command_args": []
+ }
+ ],
+ "statistics": []
+ }
+}
+"""
+ orig_spec_location = stats.SPECFILE_LOCATION
+ stats.SPECFILE_LOCATION = io.StringIO(spec_str)
+ self.assertRaises(stats.StatsError, MyStats)
+ stats.SPECFILE_LOCATION = orig_spec_location
+
+ def __send_command(self, stats, command_name, params=None):
+ '''Emulate a command arriving to stats by directly calling callback'''
+ return isc.config.ccsession.parse_answer(
+ stats.command_handler(command_name, params))
+
+ def test_start(self):
+ # Define a separate exception class so we can be sure that's actually
+ # the one raised in __check_start() below
+ class CheckException(Exception):
+ pass
+
+ def __check_start(tested_stats):
+ self.assertTrue(tested_stats.running)
+ raise CheckException # terminate the loop
+
+ # start without err
+ self.stats = MyStats()
+ self.assertFalse(self.stats.running)
+ self.stats._check_command = lambda: __check_start(self.stats)
+ # We are going to confirm start() will set running to True, avoiding
+ # to fall into a loop with the exception trick.
+ self.assertRaises(CheckException, self.stats.start)
+ self.assertEqual(self.__send_command(self.stats, "status"),
+ (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
+
+ def test_shutdown(self):
+ def __check_shutdown(tested_stats):
+ self.assertTrue(tested_stats.running)
+ self.assertEqual(self.__send_command(tested_stats, "shutdown"),
+ (0, None))
+ self.assertFalse(tested_stats.running)
+ # override get_interval() so it won't go poll statistics
+ tested_stats.get_interval = lambda : 0
+
+ self.stats = MyStats()
+ self.stats._check_command = lambda: __check_shutdown(self.stats)
+ self.stats.start()
+ self.assertTrue(self.stats.mccs.stopped)
+
+ def test_handlers(self):
+ """Test command_handler"""
+
+ __stats = MyStats()
+
+ # 'show' command. We're going to check the expected methods are
+ # called in the expected order, and check the resulting response.
+ # Details of each method are tested separately.
+ call_log = []
+ def __steal_method(fn_name, *arg):
+ call_log.append((fn_name, arg))
+ if fn_name == 'update_stat':
+ return False # "no error"
+ if fn_name == 'showschema':
+ return isc.config.create_answer(0, 'no error')
+
+ # Fake some methods and attributes for inspection
+ __stats.do_polling = lambda: __steal_method('polling')
+ __stats.update_statistics_data = \
+ lambda x, y, z: __steal_method('update_stat', x, y, z)
+ __stats.update_modules = lambda: __steal_method('update_module')
+ __stats.mccs.lname = 'test lname'
+ __stats.statistics_data = {'Init': {'boot_time': self.const_datetime}}
+
+ # skip initial polling
+ stats.get_timestamp = lambda: 0
+ __stats._lasttime_poll = 0
+
+ stats.get_datetime = lambda: 42 # make the result predictable
+
+ # now send the command
+ self.assertEqual(
+ self.__send_command(
+ __stats, 'show',
+ params={ 'owner' : 'Init', 'name' : 'boot_time' }),
+ (0, {'Init': {'boot_time': self.const_datetime}}))
+ # Check if expected methods are called
+ self.assertEqual([('update_stat',
+ ('Stats', 'test lname',
+ {'timestamp': 0,
+ 'report_time': 42})),
+ ('update_module', ())], call_log)
+
+ # Then update faked timestamp so the initial polling will happen, and
+ # confirm that.
+ call_log = []
+ stats.get_timestamp = lambda: 10
+ self.assertEqual(
+ self.__send_command(
+ __stats, 'show',
+ params={ 'owner' : 'Init', 'name' : 'boot_time' }),
+ (0, {'Init': {'boot_time': self.const_datetime}}))
+ self.assertEqual([('polling', ()),
+ ('update_stat',
+ ('Stats', 'test lname',
+ {'timestamp': 10,
+ 'report_time': 42})),
+ ('update_module', ())], call_log)
+
+ # 'status' command. We can confirm the behavior without any fake
+ self.assertEqual(
+ self.__send_command(__stats, 'status'),
+ (0, "Stats is up. (PID " + str(os.getpid()) + ")"))
+
+ # 'showschema' command. update_modules() will be called, which
+ # (implicitly) confirms the correct method is called; further details
+ # are tested separately.
+ call_log = []
+ (rcode, value) = self.__send_command(__stats, 'showschema')
+ self.assertEqual([('update_module', ())], call_log)
+
+ # Unknown command. Error should be returned
+ self.assertEqual(
+ self.__send_command(__stats, '__UNKNOWN__'),
+ (1, "Unknown command: '__UNKNOWN__'"))
+
+ def test_update_modules(self):
+ """Confirm the behavior of Stats.update_modules().
+
+ It checks whether the expected command is sent to ConfigManager,
+ and whether the answer from ConfigManager is handled as expected.
+
+ """
+
+ def __check_rpc_call(command, group):
+ self.assertEqual('ConfigManager', group)
+ self.assertEqual(command,
+ isc.config.ccsession.COMMAND_GET_STATISTICS_SPEC)
+ answer_value = {'Init': [{
+ "item_name": "boot_time",
+ "item_type": "string",
+ "item_optional": False,
+ # Use a different default so we can check it below
+ "item_default": "2013-01-01T00:00:01Z",
+ "item_title": "Boot time",
+ "item_description": "dummy desc",
+ "item_format": "date-time"
+ }]}
+ return answer_value
+
+ self.stats = MyStats()
+ self.stats.cc_session.rpc_call = __check_rpc_call
+
+ self.stats.update_modules()
+
+ # Stats is always incorporated. For others, only the ones returned
+ # by group_recvmsg() above is available.
+ self.assertTrue('Stats' in self.stats.modules)
+ self.assertTrue('Init' in self.stats.modules)
+ self.assertFalse('Dummy' in self.stats.modules)
+
+ my_statistics_data = stats.get_spec_defaults(
+ self.stats.modules['Stats'].get_statistics_spec())
+ self.assertTrue('report_time' in my_statistics_data)
+ self.assertTrue('boot_time' in my_statistics_data)
+ self.assertTrue('last_update_time' in my_statistics_data)
+ self.assertTrue('timestamp' in my_statistics_data)
+ self.assertTrue('lname' in my_statistics_data)
+ self.assertEqual(my_statistics_data['report_time'],
+ self.const_default_datetime)
+ self.assertEqual(my_statistics_data['boot_time'],
+ self.const_default_datetime)
+ self.assertEqual(my_statistics_data['last_update_time'],
+ self.const_default_datetime)
+ self.assertEqual(my_statistics_data['timestamp'], 0.0)
+ self.assertEqual(my_statistics_data['lname'], "")
+ my_statistics_data = stats.get_spec_defaults(
+ self.stats.modules['Init'].get_statistics_spec())
+ self.assertTrue('boot_time' in my_statistics_data)
+ self.assertEqual(my_statistics_data['boot_time'],
+ "2013-01-01T00:00:01Z")
+
+ # Error case
+ def __raise_on_rpc_call(x, y):
+ raise isc.config.RPCError(99, 'error')
+ orig_parse_answer = stats.isc.config.ccsession.parse_answer
+ self.stats.cc_session.rpc_call = __raise_on_rpc_call
+ self.assertRaises(stats.StatsError, self.stats.update_modules)
+
+ def test_get_statistics_data(self):
+ """Confirm the behavior of Stats.get_statistics_data().
+
+ It should first call update_modules(), and then retrieve the requested
+ data from statistics_data. We confirm this by fake update_modules()
+ where we set the expected data in statistics_data.
+
+ """
+ self.stats = MyStats()
+ def __faked_update_modules():
+ self.stats.statistics_data = { \
+ 'Stats': {
+ 'report_time': self.const_default_datetime,
+ 'boot_time': None,
+ 'last_update_time': None,
+ 'timestamp': 0.0,
+ 'lname': 'dummy name'
+ },
+ 'Init': { 'boot_time': None }
+ }
+
+ self.stats.update_modules = __faked_update_modules
+
+ my_statistics_data = self.stats.get_statistics_data()
+ self.assertTrue('Stats' in my_statistics_data)
+ self.assertTrue('Init' in my_statistics_data)
+ self.assertTrue('boot_time' in my_statistics_data['Init'])
+
+ my_statistics_data = self.stats.get_statistics_data(owner='Stats')
+ self.assertTrue('Stats' in my_statistics_data)
+ self.assertTrue('report_time' in my_statistics_data['Stats'])
+ self.assertTrue('boot_time' in my_statistics_data['Stats'])
+ self.assertTrue('last_update_time' in my_statistics_data['Stats'])
+ self.assertTrue('timestamp' in my_statistics_data['Stats'])
+ self.assertTrue('lname' in my_statistics_data['Stats'])
+ self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
+ owner='Foo')
+
+ my_statistics_data = self.stats.get_statistics_data(
+ owner='Stats', name='report_time')
+ self.assertEqual(my_statistics_data['Stats']['report_time'],
+ self.const_default_datetime)
+
+ my_statistics_data = self.stats.get_statistics_data(
+ owner='Stats', name='boot_time')
+ self.assertTrue('boot_time' in my_statistics_data['Stats'])
+
+ my_statistics_data = self.stats.get_statistics_data(
+ owner='Stats', name='last_update_time')
+ self.assertTrue('last_update_time' in my_statistics_data['Stats'])
+
+ my_statistics_data = self.stats.get_statistics_data(
+ owner='Stats', name='timestamp')
+ self.assertEqual(my_statistics_data['Stats']['timestamp'], 0.0)
+
+ my_statistics_data = self.stats.get_statistics_data(
+ owner='Stats', name='lname')
+ self.assertTrue(len(my_statistics_data['Stats']['lname']) >0)
+ self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
+ owner='Stats', name='Bar')
+ self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
+ owner='Foo', name='Bar')
+ self.assertRaises(stats.StatsError, self.stats.get_statistics_data,
+ name='Bar')
+
+ def test_update_statistics_data(self):
+ """test for list-type statistics"""
+ self.stats = MyStats()
+ _test_exp1 = {
+ 'zonename': 'test1.example',
+ 'queries.tcp': 5,
+ 'queries.udp': 4
+ }
+ _test_exp2 = {
+ 'zonename': 'test2.example',
+ 'queries.tcp': 3,
+ 'queries.udp': 2
+ }
+ _test_exp3 = {}
+ _test_exp4 = {
+ 'queries.udp': 4
+ }
+ _test_exp5_1 = {
+ 'queries.perzone': [
+ { },
+ {
+ 'queries.udp': 9876
+ }
+ ]
+ }
+ _test_exp5_2 = {
+ 'queries.perzone[1]/queries.udp':
+ isc.cc.data.find(_test_exp5_1,
+ 'queries.perzone[1]/queries.udp')
+ }
+ # Success cases
+ self.assertEqual(self.stats.statistics_data['Stats']['lname'],
+ self.stats.cc_session.lname)
+ self.stats.update_statistics_data(
+ 'Stats', self.stats.cc_session.lname,
+ {'lname': 'foo at bar'})
+ self.assertEqual(self.stats.statistics_data['Stats']['lname'],
+ 'foo at bar')
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'queries.perzone': [_test_exp1]}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['queries.perzone'],\
+ [_test_exp1])
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'queries.perzone': [_test_exp2]}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['queries.perzone'],\
+ [_test_exp2])
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'queries.perzone': [_test_exp1,_test_exp2]}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['queries.perzone'],
+ [_test_exp1,_test_exp2])
+ # differential update
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'queries.perzone': [_test_exp3,_test_exp4]}))
+ _new_data = stats.merge_oldnew(_test_exp2,_test_exp4)
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['queries.perzone'], \
+ [_test_exp1,_new_data])
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', _test_exp5_2))
+ _new_data = stats.merge_oldnew(_new_data,
+ _test_exp5_1['queries.perzone'][1])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['queries.perzone'], \
+ [_test_exp1,_new_data])
+ # Error cases
+ self.assertEqual(self.stats.update_statistics_data('Stats', None,
+ {'lname': 0.0}),
+ ['0.0 should be a string'])
+ self.assertEqual(self.stats.update_statistics_data('Dummy', None,
+ {'foo': 'bar'}),
+ ['unknown module name: Dummy'])
+ self.assertEqual(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'queries.perzone': [None]}), ['None should be a map'])
+
+ def test_update_statistics_data_pt2(self):
+ """test for named_set-type statistics"""
+ self.stats = MyStats()
+ _test_exp1 = \
+ { 'test10.example': { 'queries.tcp': 5, 'queries.udp': 4 } }
+ _test_exp2 = \
+ { 'test20.example': { 'queries.tcp': 3, 'queries.udp': 2 } }
+ _test_exp3 = {}
+ _test_exp4 = { 'test20.example': { 'queries.udp': 4 } }
+ _test_exp5_1 = { 'test10.example': { 'queries.udp': 5432 } }
+ _test_exp5_2 ={
+ 'nds_queries.perzone/test10.example/queries.udp':
+ isc.cc.data.find(_test_exp5_1, 'test10.example/queries.udp')
+ }
+ _test_exp6 = { 'foo/bar': 'brabra' }
+ _test_exp7 = { 'foo[100]': 'bar' }
+ # Success cases
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'nds_queries.perzone': _test_exp1}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ _test_exp1)
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'nds_queries.perzone': _test_exp2}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ dict(_test_exp1,**_test_exp2))
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'nds_queries.perzone':
+ dict(_test_exp1, **_test_exp2)}))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],
+ dict(_test_exp1, **_test_exp2))
+ # differential update
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'nds_queries.perzone':
+ dict(_test_exp3, **_test_exp4)}))
+ _new_val = dict(_test_exp1,
+ **stats.merge_oldnew(_test_exp2,_test_exp4))
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ _new_val)
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo1', _test_exp5_2))
+ _new_val = stats.merge_oldnew(_new_val, _test_exp5_1)
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ _new_val)
+ self.assertIsNone(self.stats.update_statistics_data(
+ 'Auth', 'foo2', _test_exp5_2))
+ _new_val = stats.merge_oldnew(_new_val, _test_exp5_1)
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo2']['nds_queries.perzone'],\
+ _test_exp5_1)
+ # Error cases
+ self.assertEqual(self.stats.update_statistics_data(
+ 'Auth', 'foo1', {'nds_queries.perzone': None}),
+ ['None should be a map'])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ _new_val)
+ self.assertEqual(self.stats.update_statistics_data(
+ 'Auth', 'foo1', _test_exp6), ['unknown item foo'])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']\
+ ['foo1']['nds_queries.perzone'],\
+ _new_val)
+ self.assertEqual(self.stats.update_statistics_data(
+ 'Init', 'bar1', _test_exp7), ["KeyError: 'foo'"])
+ self.assertEqual(self.stats.update_statistics_data(
+ 'Foo', 'foo1', _test_exp6), ['unknown module name: Foo'])
+
+ def test_update_statistics_data_withmid(self):
+ self.stats = MyStats()
+
+ # This test relies on existing statistics data at the Stats object.
+ # This version of test prepares the data using the do_polling() method;
+ # that's a bad practice because a unittest for a method
+ # (update_statistics_data) would heavily depend on details of another
+ # method (do_polling). However, there's currently no direct test
+ # for do_polling (which is also bad), so we still keep that approach,
+ # partly for testing do_polling indirectly. #2781 should provide
+ # direct test for do_polling, with which this test scenario should
+ # also be changed to be more stand-alone.
+
+ # We use the knowledge of what kind of messages are sent via
+ # do_polling, and return the following faked answer directly.
+ create_answer = isc.config.ccsession.create_answer # shortcut
+ self.stats._answers = [
+ # Answer for "show_processes"
+ (create_answer(0, [[1034, 'b10-auth-1', 'Auth'],
+ [1035, 'b10-auth-2', 'Auth']]), None),
+ # Answers for "getstats". 2 for Auth instances and 1 for Init.
+ # we return some bogus values for Init, but the rest of the test
+ # doesn't need it, so it's okay.
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth1'}),
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth2'}),
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth3'})
+ ]
+ # do_polling calls update_modules internally; in our scenario there's
+ # no change in modules, so we make it no-op.
+ self.stats.update_modules = lambda: None
+ # Now call do_polling.
+ self.stats.do_polling()
+
+ # samples of query number
+ bar1_tcp = 1001
+ bar2_tcp = 2001
+ bar3_tcp = 1002
+ bar3_udp = 1003
+ # two auth instances invoked, so we double the pre-set stat values
+ sum_qtcp = self.stats._queries_tcp * 2
+ sum_qudp = self.stats._queries_udp * 2
+ self.stats.update_statistics_data('Auth', "bar1 at foo",
+ {'queries.tcp': bar1_tcp})
+ self.assertTrue('Auth' in self.stats.statistics_data)
+ self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
+ bar1_tcp + sum_qtcp)
+ self.assertTrue('Auth' in self.stats.statistics_data_bymid)
+ self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
+ self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid
+ ['Auth']['bar1 at foo'])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo'],
+ {'queries.tcp': bar1_tcp})
+ # check consolidation of statistics data even if there is
+ # non-existent mid of Auth
+ self.stats.update_statistics_data('Auth', "bar2 at foo",
+ {'queries.tcp': bar2_tcp})
+ self.assertTrue('Auth' in self.stats.statistics_data)
+ self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
+ bar1_tcp + bar2_tcp + sum_qtcp)
+ self.assertTrue('Auth' in self.stats.statistics_data_bymid)
+ self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
+ self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid['Auth']['bar1 at foo'])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo'],
+ {'queries.tcp': bar1_tcp})
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar2 at foo'],
+ {'queries.tcp': bar2_tcp})
+ # kill running Auth but the statistics data doesn't change
+ self.stats.update_statistics_data()
+ self.assertTrue('Auth' in self.stats.statistics_data)
+ self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
+ self.assertTrue('queries.udp' in self.stats.statistics_data['Auth'])
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
+ bar1_tcp + bar2_tcp + sum_qtcp)
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.udp'],
+ sum_qudp)
+ self.assertTrue('Auth' in self.stats.statistics_data_bymid)
+ # restore statistics data of killed auth
+ self.stats.update_statistics_data('Auth',
+ "bar1 at foo",
+ {'queries.tcp': bar1_tcp})
+ # set another mid of Auth
+ self.stats.update_statistics_data('Auth',
+ "bar3 at foo",
+ {'queries.tcp':bar3_tcp,
+ 'queries.udp':bar3_udp})
+ self.assertTrue('Auth' in self.stats.statistics_data)
+ self.assertTrue('queries.tcp' in self.stats.statistics_data['Auth'])
+ self.assertTrue('queries.udp' in self.stats.statistics_data['Auth'])
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.tcp'],
+ bar1_tcp + bar2_tcp + bar3_tcp + sum_qtcp)
+ self.assertEqual(self.stats.statistics_data['Auth']['queries.udp'],
+ bar3_udp + sum_qudp)
+ self.assertTrue('Auth' in self.stats.statistics_data_bymid)
+ self.assertTrue('bar1 at foo' in self.stats.statistics_data_bymid['Auth'])
+ self.assertTrue('bar3 at foo' in self.stats.statistics_data_bymid['Auth'])
+ self.assertTrue('queries.tcp' in self.stats.statistics_data_bymid['Auth']['bar1 at foo'])
+ self.assertTrue('queries.udp' in self.stats.statistics_data_bymid['Auth']['bar3 at foo'])
+ self.assertTrue('queries.udp' in self.stats.statistics_data_bymid['Auth']['bar3 at foo'])
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar1 at foo']['queries.tcp'], bar1_tcp)
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar3 at foo']['queries.tcp'], bar3_tcp)
+ self.assertEqual(self.stats.statistics_data_bymid['Auth']['bar3 at foo']['queries.udp'], bar3_udp)
+
+ def test_config(self):
+ orig_get_timestamp = stats.get_timestamp
+ stats.get_timestamp = lambda : self.const_timestamp
+ stat = MyStats()
+
+ # test updating poll-interval
+ self.assertEqual(stat.config['poll-interval'], 60)
+ self.assertEqual(stat.get_interval(), 60)
+ self.assertEqual(stat.next_polltime, self.const_timestamp + 60)
+ self.assertEqual(stat.config_handler({'poll-interval': 120}),
+ isc.config.create_answer(0))
+ self.assertEqual(stat.config['poll-interval'], 120)
+ self.assertEqual(stat.get_interval(), 120)
+ self.assertEqual(stat.next_polltime, self.const_timestamp + 120)
+ stats.get_timestamp = orig_get_timestamp
+ self.assertEqual(stat.config_handler({'poll-interval': "foo"}),
+ isc.config.create_answer(1, 'foo should be an integer'))
+ self.assertEqual(stat.config_handler({'poll-interval': -1}),
+ isc.config.create_answer(1, 'Negative integer ignored'))
+ # unknown item
+ self.assertEqual(
+ stat.config_handler({'_UNKNOWN_KEY_': None}),
+ isc.config.ccsession.create_answer(
+ 1, "unknown item _UNKNOWN_KEY_"))
+ # test no change if zero interval time
+ self.assertEqual(stat.config_handler({'poll-interval': 0}),
+ isc.config.create_answer(0))
+ self.assertEqual(stat.config['poll-interval'], 0)
+
+ # see the comment for test_update_statistics_data_withmid. We abuse
+ # do_polling here, too. With #2781 we should make it more direct.
+ create_answer = isc.config.ccsession.create_answer # shortcut
+ stat._answers = [\
+ # Answer for "show_processes"
+ (create_answer(0, []), None),
+ # Answers for "getstats" for Init (the other one for Auth, but
+ # that doesn't matter for this test)
+ (create_answer(0, stat._init_sdata), {'from': 'init'}),
+ (create_answer(0, stat._init_sdata), {'from': 'init'})
+ ]
+ stat.update_modules = lambda: None
+
+ self.assertEqual(
+ self.__send_command(
+ stat, 'show',
+ params={ 'owner' : 'Init', 'name' : 'boot_time' }),
+ (0, {'Init': {'boot_time': self.const_datetime}}))
+
+ def test_commands(self):
+ self.stats = MyStats()
+
+ # status
+ self.assertEqual(self.stats.command_status(),
+ isc.config.create_answer(
+ 0, "Stats is up. (PID " + str(os.getpid()) + ")"))
+
+ # shutdown
+ self.stats.running = True
+ self.assertEqual(self.stats.command_shutdown(),
+ isc.config.create_answer(0))
+ self.assertFalse(self.stats.running)
+
+ def test_command_show_error(self):
+ self.stats = MyStats()
+ self.assertEqual(self.stats.command_show(owner='Foo', name=None),
+ isc.config.create_answer(
+ 1,
+ "specified arguments are incorrect: owner: Foo, name: None"))
+ self.assertEqual(self.stats.command_show(owner='Foo', name='_bar_'),
+ isc.config.create_answer(
+ 1,
+ "specified arguments are incorrect: owner: Foo, name: _bar_"))
+ self.assertEqual(self.stats.command_show(owner='Foo', name='bar'),
+ isc.config.create_answer(
+ 1,
+ "specified arguments are incorrect: owner: Foo, name: bar"))
+
+ def test_command_show_auth(self):
+ self.stats = MyStats()
+ self.stats.update_modules = lambda: None
+
+ # Test data borrowed from test_update_statistics_data_withmid
+ create_answer = isc.config.ccsession.create_answer # shortcut
+ self.stats._answers = [
+ (create_answer(0, [[1034, 'b10-auth-1', 'Auth'],
+ [1035, 'b10-auth-2', 'Auth']]), None),
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth1'}),
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth2'}),
+ (create_answer(0, self.stats._auth_sdata), {'from': 'auth3'})
+ ]
+
+ num_instances = 2
+ sum_qtcp = 0
+ sum_qudp = 0
+ sum_qtcp_perzone1 = 0
+ sum_qudp_perzone1 = 0
+ sum_qtcp_perzone2 = 4 * num_instances
+ sum_qudp_perzone2 = 3 * num_instances
+ sum_qtcp_nds_perzone10 = 0
+ sum_qudp_nds_perzone10 = 0
+ sum_qtcp_nds_perzone20 = 4 * num_instances
+ sum_qudp_nds_perzone20 = 3 * num_instances
+
+ self.maxDiff = None
+ for a in (0, num_instances):
+ sum_qtcp += self.stats._queries_tcp
+ sum_qudp += self.stats._queries_udp
+ sum_qtcp_perzone1 += self.stats._queries_per_zone[0]['queries.tcp']
+ sum_qudp_perzone1 += self.stats._queries_per_zone[0]['queries.udp']
+ sum_qtcp_nds_perzone10 += \
+ self.stats._nds_queries_per_zone['test10.example']['queries.tcp']
+ sum_qudp_nds_perzone10 += \
+ self.stats._nds_queries_per_zone['test10.example']['queries.udp']
+
+ self.assertEqual(self.stats.command_show(owner='Auth'),
+ isc.config.create_answer(
+ 0, {'Auth':{ 'queries.udp': sum_qudp,
+ 'queries.tcp': sum_qtcp,
+ 'queries.perzone': [{ 'zonename': 'test1.example',
+ 'queries.udp': sum_qudp_perzone1,
+ 'queries.tcp': sum_qtcp_perzone1 },
+ { 'zonename': 'test2.example',
+ 'queries.udp': sum_qudp_perzone2,
+ 'queries.tcp': sum_qtcp_perzone2 }
+ ],
+ 'nds_queries.perzone': { 'test10.example' : {
+ 'queries.udp': sum_qudp_nds_perzone10,
+ 'queries.tcp': sum_qtcp_nds_perzone10 },
+ 'test20.example' : {
+ 'queries.udp': sum_qudp_nds_perzone20,
+ 'queries.tcp': sum_qtcp_nds_perzone20 }
+ }}}))
+ self.assertEqual(self.stats.command_show(owner='Auth', name='queries.udp'),
+ isc.config.create_answer(
+ 0, {'Auth': {'queries.udp': sum_qudp}}))
+ self.assertEqual(self.stats.command_show(owner='Auth', name='queries.perzone'),
+ isc.config.create_answer(
+ 0, {'Auth': {'queries.perzone': [
+ { 'zonename': 'test1.example',
+ 'queries.udp': sum_qudp_perzone1,
+ 'queries.tcp': sum_qtcp_perzone1 },
+ { 'zonename': 'test2.example',
+ 'queries.udp': sum_qudp_perzone2,
+ 'queries.tcp': sum_qtcp_perzone2 }]}}))
+ self.assertEqual(self.stats.command_show(owner='Auth', name='nds_queries.perzone'),
+ isc.config.create_answer(
+ 0, {'Auth': {'nds_queries.perzone': {
+ 'test10.example': {
+ 'queries.udp': sum_qudp_nds_perzone10,
+ 'queries.tcp': sum_qtcp_nds_perzone10 },
+ 'test20.example': {
+ 'queries.udp': sum_qudp_nds_perzone20,
+ 'queries.tcp': sum_qtcp_nds_perzone20 }}}}))
+
+ def test_command_show_stats(self):
+ self.stats = MyStats()
+ orig_get_datetime = stats.get_datetime
+ orig_get_timestamp = stats.get_timestamp
+ stats.get_datetime = lambda x=None: self.const_datetime
+ stats.get_timestamp = lambda : self.const_timestamp
+ self.assertEqual(self.stats.command_show(owner='Stats',
+ name='report_time'),
+ isc.config.create_answer(
+ 0, {'Stats': {'report_time':self.const_datetime}}))
+ self.assertEqual(self.stats.command_show(owner='Stats',
+ name='timestamp'),
+ isc.config.create_answer(
+ 0, {'Stats': {'timestamp':self.const_timestamp}}))
+ stats.get_datetime = orig_get_datetime
+ stats.get_timestamp = orig_get_timestamp
+ self.stats.do_polling = lambda : None
+ self.stats.modules[self.stats.module_name] = \
+ isc.config.module_spec.ModuleSpec(
+ { "module_name": self.stats.module_name, "statistics": [] } )
+ self.assertRaises(
+ stats.StatsError, self.stats.command_show,
+ owner=self.stats.module_name, name='bar')
+
+ def test_command_showchema(self):
+ self.stats = MyStats()
+ (rcode, value) = isc.config.ccsession.parse_answer(
+ self.stats.command_showschema())
+ self.assertEqual(rcode, 0)
+ self.assertEqual(len(value), 3)
+ self.assertTrue('Stats' in value)
+ self.assertTrue('Init' in value)
+ self.assertTrue('Auth' in value)
+ self.assertFalse('__Dummy__' in value)
+ schema = value['Stats']
+ self.assertEqual(len(schema), 5)
+ for item in schema:
+ self.assertTrue(len(item) == 6 or len(item) == 7)
+ self.assertTrue('item_name' in item)
+ self.assertTrue('item_type' in item)
+ self.assertTrue('item_optional' in item)
+ self.assertTrue('item_default' in item)
+ self.assertTrue('item_title' in item)
+ self.assertTrue('item_description' in item)
+ if len(item) == 7:
+ self.assertTrue('item_format' in item)
+
+ schema = value['Init']
+ self.assertEqual(len(schema), 1)
+ for item in schema:
+ self.assertTrue(len(item) == 7)
+ self.assertTrue('item_name' in item)
+ self.assertTrue('item_type' in item)
+ self.assertTrue('item_optional' in item)
+ self.assertTrue('item_default' in item)
+ self.assertTrue('item_title' in item)
+ self.assertTrue('item_description' in item)
+ self.assertTrue('item_format' in item)
+
+ schema = value['Auth']
+ self.assertEqual(len(schema), 4)
+ for item in schema:
+ if item['item_type'] == 'list' or item['item_type'] == 'named_set':
+ self.assertEqual(len(item), 7)
+ else:
+ self.assertEqual(len(item), 6)
+ self.assertTrue('item_name' in item)
+ self.assertTrue('item_type' in item)
+ self.assertTrue('item_optional' in item)
+ self.assertTrue('item_default' in item)
+ self.assertTrue('item_title' in item)
+ self.assertTrue('item_description' in item)
+
+ (rcode, value) = isc.config.ccsession.parse_answer(
+ self.stats.command_showschema(owner='Stats'))
+ self.assertEqual(rcode, 0)
+ self.assertTrue('Stats' in value)
+ self.assertFalse('Init' in value)
+ self.assertFalse('Auth' in value)
+ for item in value['Stats']:
+ self.assertTrue(len(item) == 6 or len(item) == 7)
+ self.assertTrue('item_name' in item)
+ self.assertTrue('item_type' in item)
+ self.assertTrue('item_optional' in item)
+ self.assertTrue('item_default' in item)
+ self.assertTrue('item_title' in item)
+ self.assertTrue('item_description' in item)
+ if len(item) == 7:
+ self.assertTrue('item_format' in item)
+
+ (rcode, value) = isc.config.ccsession.parse_answer(
+ self.stats.command_showschema(owner='Stats', name='report_time'))
+ self.assertEqual(rcode, 0)
+ self.assertTrue('Stats' in value)
+ self.assertFalse('Init' in value)
+ self.assertFalse('Auth' in value)
+ self.assertEqual(len(value['Stats'][0]), 7)
+ self.assertTrue('item_name' in value['Stats'][0])
+ self.assertTrue('item_type' in value['Stats'][0])
+ self.assertTrue('item_optional' in value['Stats'][0])
+ self.assertTrue('item_default' in value['Stats'][0])
+ self.assertTrue('item_title' in value['Stats'][0])
+ self.assertTrue('item_description' in value['Stats'][0])
+ self.assertTrue('item_format' in value['Stats'][0])
+ self.assertEqual(value['Stats'][0]['item_name'], 'report_time')
+ self.assertEqual(value['Stats'][0]['item_format'], 'date-time')
+
+ self.assertEqual(self.stats.command_showschema(owner='Foo'),
+ isc.config.create_answer(
+ 1, "specified arguments are incorrect: owner: Foo, name: None"))
+ self.assertEqual(self.stats.command_showschema(owner='Foo', name='bar'),
+ isc.config.create_answer(
+ 1, "specified arguments are incorrect: owner: Foo, name: bar"))
+ self.assertEqual(self.stats.command_showschema(owner='Auth'),
+ isc.config.create_answer(
+ 0, {'Auth': [{
+ "item_default": 0,
+ "item_description": "A number of total query counts which all auth servers receive over TCP since they started initially",
+ "item_name": "queries.tcp",
+ "item_optional": False,
+ "item_title": "Queries TCP",
+ "item_type": "integer"
+ },
+ {
+ "item_default": 0,
+ "item_description": "A number of total query counts which all auth servers receive over UDP since they started initially",
+ "item_name": "queries.udp",
+ "item_optional": False,
+ "item_title": "Queries UDP",
+ "item_type": "integer"
+ },
+ {
+ "item_name": "queries.perzone",
+ "item_type": "list",
+ "item_optional": False,
+ "item_default": [
+ {
+ "zonename" : "test1.example",
+ "queries.udp" : 1,
+ "queries.tcp" : 2
+ },
+ {
+ "zonename" : "test2.example",
+ "queries.udp" : 3,
+ "queries.tcp" : 4
+ }
+ ],
+ "item_title": "Queries per zone",
+ "item_description": "Queries per zone",
+ "list_item_spec": {
+ "item_name": "zones",
+ "item_type": "map",
+ "item_optional": False,
+ "item_default": {},
+ "map_item_spec": [
+ {
+ "item_name": "zonename",
+ "item_type": "string",
+ "item_optional": False,
+ "item_default": "",
+ "item_title": "Zonename",
+ "item_description": "Zonename"
+ },
+ {
+ "item_name": "queries.udp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries UDP per zone",
+ "item_description": "A number of UDP query counts per zone"
+ },
+ {
+ "item_name": "queries.tcp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries TCP per zone",
+ "item_description": "A number of TCP query counts per zone"
+ }
+ ]
+ }
+ },
+ {
+ "item_name": "nds_queries.perzone",
+ "item_type": "named_set",
+ "item_optional": False,
+ "item_default": {
+ "test10.example" : {
+ "queries.udp" : 1,
+ "queries.tcp" : 2
+ },
+ "test20.example" : {
+ "queries.udp" : 3,
+ "queries.tcp" : 4
+ }
+ },
+ "item_title": "Queries per zone",
+ "item_description": "Queries per zone",
+ "named_set_item_spec": {
+ "item_name": "zonename",
+ "item_type": "map",
+ "item_optional": False,
+ "item_default": {},
+ "item_title": "Zonename",
+ "item_description": "Zonename",
+ "map_item_spec": [
+ {
+ "item_name": "queries.udp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries UDP per zone",
+ "item_description": "A number of UDP query counts per zone"
+ },
+ {
+ "item_name": "queries.tcp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries TCP per zone",
+ "item_description": "A number of TCP query counts per zone"
+ }
+ ]
+ }
+ }]}))
+ self.assertEqual(self.stats.command_showschema(owner='Auth', name='queries.tcp'),
+ isc.config.create_answer(
+ 0, {'Auth': [{
+ "item_default": 0,
+ "item_description": "A number of total query counts which all auth servers receive over TCP since they started initially",
+ "item_name": "queries.tcp",
+ "item_optional": False,
+ "item_title": "Queries TCP",
+ "item_type": "integer"
+ }]}))
+ self.assertEqual(self.stats.command_showschema(owner='Auth', name='queries.perzone'),
+ isc.config.create_answer(
+ 0, {'Auth':[{
+ "item_name": "queries.perzone",
+ "item_type": "list",
+ "item_optional": False,
+ "item_default": [
+ {
+ "zonename" : "test1.example",
+ "queries.udp" : 1,
+ "queries.tcp" : 2
+ },
+ {
+ "zonename" : "test2.example",
+ "queries.udp" : 3,
+ "queries.tcp" : 4
+ }
+ ],
+ "item_title": "Queries per zone",
+ "item_description": "Queries per zone",
+ "list_item_spec": {
+ "item_name": "zones",
+ "item_type": "map",
+ "item_optional": False,
+ "item_default": {},
+ "map_item_spec": [
+ {
+ "item_name": "zonename",
+ "item_type": "string",
+ "item_optional": False,
+ "item_default": "",
+ "item_title": "Zonename",
+ "item_description": "Zonename"
+ },
+ {
+ "item_name": "queries.udp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries UDP per zone",
+ "item_description": "A number of UDP query counts per zone"
+ },
+ {
+ "item_name": "queries.tcp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries TCP per zone",
+ "item_description": "A number of TCP query counts per zone"
+ }
+ ]
+ }
+ }]}))
+ self.assertEqual(self.stats.command_showschema(owner='Auth', name='nds_queries.perzone'),
+ isc.config.create_answer(
+ 0, {'Auth':[{
+ "item_name": "nds_queries.perzone",
+ "item_type": "named_set",
+ "item_optional": False,
+ "item_default": {
+ "test10.example" : {
+ "queries.udp" : 1,
+ "queries.tcp" : 2
+ },
+ "test20.example" : {
+ "queries.udp" : 3,
+ "queries.tcp" : 4
+ }
+ },
+ "item_title": "Queries per zone",
+ "item_description": "Queries per zone",
+ "named_set_item_spec": {
+ "item_name": "zonename",
+ "item_type": "map",
+ "item_optional": False,
+ "item_default": {},
+ "item_title": "Zonename",
+ "item_description": "Zonename",
+ "map_item_spec": [
+ {
+ "item_name": "queries.udp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries UDP per zone",
+ "item_description": "A number of UDP query counts per zone"
+ },
+ {
+ "item_name": "queries.tcp",
+ "item_type": "integer",
+ "item_optional": False,
+ "item_default": 0,
+ "item_title": "Queries TCP per zone",
+ "item_description": "A number of TCP query counts per zone"
+ }
+ ]
+ }
+ }]}))
+
+ self.assertEqual(self.stats.command_showschema(owner='Stats', name='bar'),
+ isc.config.create_answer(
+ 1, "specified arguments are incorrect: owner: Stats, name: bar"))
+ self.assertEqual(self.stats.command_showschema(name='bar'),
+ isc.config.create_answer(
+ 1, "module name is not specified"))
+
+ def test_polling_init(self):
+ """check statistics data of 'Init'."""
+
+ stat = MyStats()
+ stat.update_modules = lambda: None
+ create_answer = isc.config.ccsession.create_answer # shortcut
+
+ stat._answers = [
+ # Answer for "show_processes"
+ (create_answer(0, []), None),
+ # Answers for "getstats" for Init (type of boot_time is invalid)
+ (create_answer(0, {'boot_time': self.const_datetime}),
+ {'from': 'init'}),
+ ]
+
+ stat.do_polling()
+ self.assertEqual(
+ stat.statistics_data_bymid['Init']['init'],
+ {'boot_time': self.const_datetime})
+
+ def test_polling_consolidate(self):
+ """check statistics data of multiple instances of same module."""
+ stat = MyStats()
+ stat.update_modules = lambda: None
+ create_answer = isc.config.ccsession.create_answer # shortcut
+
+ # Test data borrowed from test_update_statistics_data_withmid
+ stat._answers = [
+ (create_answer(0, [[1034, 'b10-auth-1', 'Auth'],
+ [1035, 'b10-auth-2', 'Auth']]), None),
+ (create_answer(0, stat._auth_sdata), {'from': 'auth1'}),
+ (create_answer(0, stat._auth_sdata), {'from': 'auth2'}),
+ (create_answer(0, stat._auth_sdata), {'from': 'auth3'})
+ ]
+
+ stat.do_polling()
+
+ # check statistics data of each 'Auth' instances. expected data
+ # for 'nds_queries.perzone' is special as it needs data merge.
+ self.assertEqual(2, len(stat.statistics_data_bymid['Auth'].values()))
+ for s in stat.statistics_data_bymid['Auth'].values():
+ self.assertEqual(
+ s, {'queries.perzone': stat._auth_sdata['queries.perzone'],
+ 'nds_queries.perzone': stat._nds_queries_per_zone,
+ 'queries.tcp': stat._auth_sdata['queries.tcp'],
+ 'queries.udp': stat._auth_sdata['queries.udp']})
+
+ # check consolidation of statistics data of the auth instances.
+ # it's union of the reported data and the spec default.
+ n = len(stat.statistics_data_bymid['Auth'].values())
+ self.maxDiff = None
+ self.assertEqual(
+ stat.statistics_data['Auth'],
+ {'queries.perzone': [
+ {'zonename': 'test1.example',
+ 'queries.tcp': 5 * n,
+ 'queries.udp': 4 * n},
+ {'zonename': 'test2.example',
+ 'queries.tcp': 4 * n,
+ 'queries.udp': 3 * n},
+ ],
+ 'nds_queries.perzone': {
+ 'test10.example': {
+ 'queries.tcp': 5 * n,
+ 'queries.udp': 4 * n
+ },
+ 'test20.example': {
+ 'queries.tcp': 4 * n,
+ 'queries.udp': 3 * n
+ },
+ },
+ 'queries.tcp': 3 * n,
+ 'queries.udp': 2 * n})
+
+ def test_polling_stats(self):
+ """Check statistics data of 'Stats'
+
+ This is actually irrelevant to do_polling(), but provided to
+ compatibility of older tests.
+
+ """
+ stat = MyStats()
+ self.assertEqual(len(stat.statistics_data['Stats']), 5)
+ self.assertTrue('boot_time' in stat.statistics_data['Stats'])
+ self.assertTrue('last_update_time' in stat.statistics_data['Stats'])
+ self.assertTrue('report_time' in stat.statistics_data['Stats'])
+ self.assertTrue('timestamp' in stat.statistics_data['Stats'])
+ self.assertEqual(stat.statistics_data['Stats']['lname'],
+ stat.mccs._session.lname)
+
+ def test_polling2(self):
+ """Test do_polling() doesn't incorporate broken statistics data.
+
+ Actually, this is not a test for do_polling() itself. It's bad, but
+ fixing that is a subject of different ticket.
+
+ """
+ stat = MyStats()
+ # check default statistics data of 'Init'
+ self.assertEqual(
+ stat.statistics_data['Init'],
+ {'boot_time': self.const_default_datetime})
+
+ # set invalid statistics
+ create_answer = isc.config.ccsession.create_answer # shortcut
+ stat._answers = [
+ # Answer for "show_processes"
+ (create_answer(0, []), None),
+ # Answers for "getstats" for Init (type of boot_time is invalid)
+ (create_answer(0, {'boot_time': 1}), {'from': 'init'}),
+ ]
+ stat.update_modules = lambda: None
+
+ # do_polling() should ignore the invalid answer;
+ # default data shouldn't be replaced.
+ stat.do_polling()
+ self.assertEqual(
+ stat.statistics_data['Init'],
+ {'boot_time': self.const_default_datetime})
+
+class TestOSEnv(unittest.TestCase):
+ def test_osenv(self):
+ """
+ test for the environ variable "B10_FROM_SOURCE"
+ "B10_FROM_SOURCE" is set in Makefile
+ """
+ # test case having B10_FROM_SOURCE
+ self.assertTrue("B10_FROM_SOURCE" in os.environ)
+ self.assertEqual(stats.SPECFILE_LOCATION, \
+ os.environ["B10_FROM_SOURCE"] + os.sep + \
+ "src" + os.sep + "bin" + os.sep + "stats" + \
+ os.sep + "stats.spec")
+ # test case not having B10_FROM_SOURCE
+ path = os.environ["B10_FROM_SOURCE"]
+ os.environ.pop("B10_FROM_SOURCE")
+ self.assertFalse("B10_FROM_SOURCE" in os.environ)
+ # import stats again
+ imp.reload(stats)
+ # revert the changes
+ os.environ["B10_FROM_SOURCE"] = path
+ imp.reload(stats)
+
+if __name__ == "__main__":
+ isc.log.resetUnitTestRootLogger()
+ unittest.main()
diff --git a/src/bin/stats/tests/test_utils.py b/src/bin/stats/tests/test_utils.py
index 45e9a17..8b45f26 100644
--- a/src/bin/stats/tests/test_utils.py
+++ b/src/bin/stats/tests/test_utils.py
@@ -20,13 +20,11 @@ Utilities and mock modules for unittests of statistics modules
import os
import io
import time
-import sys
import threading
-import tempfile
import json
import signal
+import socket
-import msgq
import isc.config.cfgmgr
import stats
import stats_httpd
@@ -51,19 +49,6 @@ class SignalHandler():
"""invokes unittest.TestCase.fail as a signal handler"""
self.fail_handler("A deadlock might be detected")
-def send_command(command_name, module_name, params=None):
- cc_session = isc.cc.Session()
- command = isc.config.ccsession.create_command(command_name, params)
- seq = cc_session.group_sendmsg(command, module_name)
- try:
- (answer, env) = cc_session.group_recvmsg(False, seq)
- if answer:
- return isc.config.ccsession.parse_answer(answer)
- except isc.cc.SessionTimeout:
- pass
- finally:
- cc_session.close()
-
class ThreadingServerManager:
def __init__(self, server, *args, **kwargs):
self.server = server(*args, **kwargs)
@@ -91,45 +76,7 @@ class ThreadingServerManager:
else:
self.server._thread.join(0) # timeout is 0
-class MockMsgq:
- def __init__(self):
- self._started = threading.Event()
- self.msgq = msgq.MsgQ(verbose=False)
- result = self.msgq.setup()
- if result:
- sys.exit("Error on Msgq startup: %s" % result)
-
- def run(self):
- self._started.set()
- try:
- self.msgq.run()
- finally:
- # Make sure all the sockets, etc, are removed once it stops.
- self.msgq.shutdown()
-
- def shutdown(self):
- # Ask it to terminate nicely
- self.msgq.stop()
-
-class MockCfgmgr:
- def __init__(self):
- self._started = threading.Event()
- self.cfgmgr = isc.config.cfgmgr.ConfigManager(
- os.environ['CONFIG_TESTDATA_PATH'], "b10-config.db")
- self.cfgmgr.read_config()
-
- def run(self):
- self._started.set()
- try:
- self.cfgmgr.run()
- except Exception:
- pass
-
- def shutdown(self):
- self.cfgmgr.running = False
-
-class MockInit:
- spec_str = """\
+INIT_SPEC_STR = """\
{
"module_spec": {
"module_name": "Init",
@@ -221,56 +168,12 @@ class MockInit:
}
}
"""
- _BASETIME = CONST_BASETIME
- def __init__(self):
- self._started = threading.Event()
- self.running = False
- self.spec_file = io.StringIO(self.spec_str)
- # create ModuleCCSession object
- self.mccs = isc.config.ModuleCCSession(
- self.spec_file,
- self.config_handler,
- self.command_handler)
- self.spec_file.close()
- self.cc_session = self.mccs._session
- self.got_command_name = ''
- self.pid_list = [[ 9999, "b10-auth", "Auth" ],
- [ 9998, "b10-auth-2", "Auth" ]]
- self.statistics_data = {
- 'boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', self._BASETIME)
- }
-
- def run(self):
- self.mccs.start()
- self.running = True
- self._started.set()
- try:
- while self.running:
- self.mccs.check_command(False)
- except Exception:
- pass
-
- def shutdown(self):
- self.running = False
-
- def config_handler(self, new_config):
- return isc.config.create_answer(0)
-
- def command_handler(self, command, *args, **kwargs):
- self._started.set()
- self.got_command_name = command
- sdata = self.statistics_data
- if command == 'getstats':
- return isc.config.create_answer(0, sdata)
- elif command == 'show_processes':
- # Return dummy pids
- return isc.config.create_answer(
- 0, self.pid_list)
- return isc.config.create_answer(1, "Unknown Command")
-
-class MockAuth:
- spec_str = """\
+# Note: this is derived of the spec for the DNS authoritative server, but
+# for the purpose of this test, it's completely irrelevant to DNS.
+# Some statisittics specs do not make sense for practical sense but used
+# just cover various types of statistics data (list, map/dict, etc).
+AUTH_SPEC_STR = """\
{
"module_spec": {
"module_name": "Auth",
@@ -392,68 +295,6 @@ class MockAuth:
}
}
"""
- def __init__(self):
- self._started = threading.Event()
- self.running = False
- self.spec_file = io.StringIO(self.spec_str)
- # create ModuleCCSession object
- self.mccs = isc.config.ModuleCCSession(
- self.spec_file,
- self.config_handler,
- self.command_handler)
- self.spec_file.close()
- self.cc_session = self.mccs._session
- self.got_command_name = ''
- self.queries_tcp = 3
- self.queries_udp = 2
- self.queries_per_zone = [{
- 'zonename': 'test1.example',
- 'queries.tcp': 5,
- 'queries.udp': 4
- }]
- self.nds_queries_per_zone = {
- 'test10.example': {
- 'queries.tcp': 5,
- 'queries.udp': 4
- }
- }
-
- def run(self):
- self.mccs.start()
- self.running = True
- self._started.set()
- try:
- while self.running:
- self.mccs.check_command(False)
- except Exception:
- pass
-
- def shutdown(self):
- self.running = False
-
- def config_handler(self, new_config):
- return isc.config.create_answer(0)
-
- def command_handler(self, command, *args, **kwargs):
- self.got_command_name = command
- sdata = { 'queries.tcp': self.queries_tcp,
- 'queries.udp': self.queries_udp,
- 'queries.perzone' : self.queries_per_zone,
- 'nds_queries.perzone' : {
- 'test10.example': {
- 'queries.tcp': \
- isc.cc.data.find(
- self.nds_queries_per_zone,
- 'test10.example/queries.tcp')
- }
- },
- 'nds_queries.perzone/test10.example/queries.udp' :
- isc.cc.data.find(self.nds_queries_per_zone,
- 'test10.example/queries.udp')
- }
- if command == 'getstats':
- return isc.config.create_answer(0, sdata)
- return isc.config.create_answer(1, "Unknown Command")
class MyModuleCCSession(isc.config.ConfigData):
"""Mocked ModuleCCSession class.
@@ -468,6 +309,7 @@ class MyModuleCCSession(isc.config.ConfigData):
isc.config.ConfigData.__init__(self, module_spec)
self._session = self
self.stopped = False
+ self.closed = False
self.lname = 'mock_mod_ccs'
def start(self):
@@ -476,10 +318,13 @@ class MyModuleCCSession(isc.config.ConfigData):
def send_stopping(self):
self.stopped = True # just record it's called to inspect it later
-class SimpleStats(stats.Stats):
+ def close(self):
+ self.closed = True
+
+class MyStats(stats.Stats):
"""A faked Stats class for unit tests.
- This class inherits most of the real Stats class, but replace the
+ This class inherits most of the real Stats class, but replaces the
ModuleCCSession with a fake one so we can avoid network I/O in tests,
and can also inspect or tweak messages via the session more easily.
This class also maintains some faked module information and statistics
@@ -500,9 +345,9 @@ class SimpleStats(stats.Stats):
# the default answer from faked recvmsg if _answers is empty
self.__default_answer = isc.config.ccsession.create_answer(
0, {'Init':
- json.loads(MockInit.spec_str)['module_spec']['statistics'],
+ json.loads(INIT_SPEC_STR)['module_spec']['statistics'],
'Auth':
- json.loads(MockAuth.spec_str)['module_spec']['statistics']
+ json.loads(AUTH_SPEC_STR)['module_spec']['statistics']
})
# setup faked auth statistics
self.__init_auth_stat()
@@ -530,24 +375,24 @@ class SimpleStats(stats.Stats):
def __init_auth_stat(self):
self._queries_tcp = 3
self._queries_udp = 2
- self.__queries_per_zone = [{
+ self._queries_per_zone = [{
'zonename': 'test1.example', 'queries.tcp': 5, 'queries.udp': 4
}]
- self.__nds_queries_per_zone = \
+ self._nds_queries_per_zone = \
{ 'test10.example': { 'queries.tcp': 5, 'queries.udp': 4 } }
self._auth_sdata = \
{ 'queries.tcp': self._queries_tcp,
'queries.udp': self._queries_udp,
- 'queries.perzone' : self.__queries_per_zone,
+ 'queries.perzone' : self._queries_per_zone,
'nds_queries.perzone' : {
'test10.example': {
'queries.tcp': isc.cc.data.find(
- self.__nds_queries_per_zone,
+ self._nds_queries_per_zone,
'test10.example/queries.tcp')
}
},
'nds_queries.perzone/test10.example/queries.udp' :
- isc.cc.data.find(self.__nds_queries_per_zone,
+ isc.cc.data.find(self._nds_queries_per_zone,
'test10.example/queries.udp')
}
@@ -589,32 +434,62 @@ class SimpleStats(stats.Stats):
answer, _ = self.__group_recvmsg(None, None)
return isc.config.ccsession.parse_answer(answer)[1]
-class MyStats(stats.Stats):
-
- stats._BASETIME = CONST_BASETIME
- stats.get_timestamp = lambda: time.mktime(CONST_BASETIME)
- stats.get_datetime = lambda x=None: time.strftime("%Y-%m-%dT%H:%M:%SZ", CONST_BASETIME)
-
- def __init__(self):
- self._started = threading.Event()
- stats.Stats.__init__(self)
+class MyStatsHttpd(stats_httpd.StatsHttpd):
+ """A faked StatsHttpd class for unit tests.
- def run(self):
- self._started.set()
- try:
- self.start()
- except Exception:
- pass
+ This class inherits most of the real StatsHttpd class, but replaces the
+ ModuleCCSession with a fake one so we can avoid network I/O in tests,
+ and can also inspect or tweak messages via the session more easily.
- def shutdown(self):
- self.command_shutdown()
+ """
-class MyStatsHttpd(stats_httpd.StatsHttpd):
ORIG_SPECFILE_LOCATION = stats_httpd.SPECFILE_LOCATION
def __init__(self, *server_address):
self._started = threading.Event()
+ self.__dummy_socks = None # see below
+
+ # Prepare commonly used statistics schema and data requested in
+ # stats-httpd tests. For the purpose of these tests, the content of
+ # statistics data is not so important (they don't test whther the
+ # counter values are correct, etc), so hardcoding the common case
+ # should suffice. Note also that some of the statistics values and
+ # specs don't make sense in practice (see also comments on
+ # AUTH_SPEC_STR).
+ with open(stats.SPECFILE_LOCATION) as f:
+ stat_spec_str = f.read()
+ self.__default_spec_answer = {
+ 'Init': json.loads(INIT_SPEC_STR)['module_spec']['statistics'],
+ 'Auth': json.loads(AUTH_SPEC_STR)['module_spec']['statistics'],
+ 'Stats': json.loads(stat_spec_str)['module_spec']['statistics']
+ }
+ self.__default_data_answer = {
+ 'Init': {'boot_time':
+ time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME)},
+ 'Stats': {'last_update_time':
+ time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ 'report_time':
+ time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ 'lname': 'test-lname',
+ 'boot_time':
+ time.strftime('%Y-%m-%dT%H:%M:%SZ', CONST_BASETIME),
+ 'timestamp': time.mktime(CONST_BASETIME)},
+ 'Auth': {'queries.udp': 4, 'queries.tcp': 6,
+ 'queries.perzone': [
+ {'queries.udp': 8, 'queries.tcp': 10,
+ 'zonename': 'test1.example'},
+ {'queries.udp': 6, 'queries.tcp': 8,
+ 'zonename': 'test2.example'}],
+ 'nds_queries.perzone': {
+ 'test10.example': {'queries.udp': 8, 'queries.tcp': 10},
+ 'test20.example': {'queries.udp': 6, 'queries.tcp': 8}}}}
+
+ # if set, use them as faked response to rpc_call (see below).
+ # it's a list of answer data of rpc_call.
+ self._rpc_answers = []
+
if server_address:
- stats_httpd.SPECFILE_LOCATION = self.create_specfile(*server_address)
+ stats_httpd.SPECFILE_LOCATION = \
+ self.__create_specfile(*server_address)
try:
stats_httpd.StatsHttpd.__init__(self)
finally:
@@ -624,7 +499,51 @@ class MyStatsHttpd(stats_httpd.StatsHttpd):
else:
stats_httpd.StatsHttpd.__init__(self)
- def create_specfile(self, *server_address):
+ # replace some (faked) ModuleCCSession methods so we can inspect/fake.
+ # in order to satisfy select.select() we need some real socket. We
+ # use a socketpair(), but won't actually use it for communication.
+ self.cc_session.rpc_call = self.__rpc_call
+ self.__dummy_socks = socket.socketpair()
+ self.mccs.get_socket = lambda: self.__dummy_socks[0]
+
+ def open_mccs(self):
+ self.mccs = MyModuleCCSession(stats_httpd.SPECFILE_LOCATION,
+ self.config_handler,
+ self.command_handler)
+ self.cc_session = self.mccs._session
+ self.mccs.start = self.load_config # force reload
+
+ def close_mccs(self):
+ super().close_mccs()
+ if self.__dummy_socks is not None:
+ self.__dummy_socks[0].close()
+ self.__dummy_socks[1].close()
+ self.__dummy_socks = None
+
+ def __rpc_call(self, command, group, params={}):
+ """Faked ModuleCCSession.rpc_call for tests.
+
+ The stats httpd module only issues two commands: 'showschema' and
+ 'show'. In most cases we can simply use the prepared default
+ answer. If customization is needed, the test case can add a
+ faked answer by appending it to _rpc_answers. If the added object
+ is of Exception type this method raises it instead of return it,
+ emulating the situation where rpc_call() results in an exception.
+
+ """
+ if len(self._rpc_answers) == 0:
+ if command == 'showschema':
+ return self.__default_spec_answer
+ elif command == 'show':
+ return self.__default_data_answer
+ assert False, "unexpected command for faked rpc_call: " + command
+
+ answer = self._rpc_answers.pop(0)
+ if issubclass(type(answer), Exception):
+ raise answer
+ return answer
+
+ def __create_specfile(self, *server_address):
spec_io = open(self.ORIG_SPECFILE_LOCATION)
try:
spec = json.load(spec_io)
@@ -633,7 +552,8 @@ class MyStatsHttpd(stats_httpd.StatsHttpd):
for i in range(len(config)):
if config[i]['item_name'] == 'listen_on':
config[i]['item_default'] = \
- [ dict(address=a[0], port=a[1]) for a in server_address ]
+ [ dict(address=a[0], port=a[1])
+ for a in server_address ]
break
return io.StringIO(json.dumps(spec))
finally:
@@ -641,53 +561,4 @@ class MyStatsHttpd(stats_httpd.StatsHttpd):
def run(self):
self._started.set()
- try:
- self.start()
- except Exception:
- pass
-
- def shutdown(self):
- self.command_handler('shutdown', None)
-
-class BaseModules:
- def __init__(self):
- # MockMsgq
- self.msgq = ThreadingServerManager(MockMsgq)
- self.msgq.run()
- # Check whether msgq is ready. A SessionTimeout is raised here if not.
- isc.cc.session.Session().close()
- # MockCfgmgr
- self.cfgmgr = ThreadingServerManager(MockCfgmgr)
- self.cfgmgr.run()
- # MockInit
- self.b10_init = ThreadingServerManager(MockInit)
- self.b10_init.run()
- # MockAuth
- self.auth = ThreadingServerManager(MockAuth)
- self.auth.run()
- self.auth2 = ThreadingServerManager(MockAuth)
- self.auth2.run()
-
-
- def shutdown(self):
- # MockMsgq. We need to wait (blocking) for it, otherwise it'll wipe out
- # a socket for another test during its shutdown.
- self.msgq.shutdown(True)
-
- # We also wait for the others, but these are just so we don't create
- # too many threads in parallel.
-
- # MockAuth
- self.auth2.shutdown(True)
- self.auth.shutdown(True)
- # MockInit
- self.b10_init.shutdown(True)
- # MockCfgmgr
- self.cfgmgr.shutdown(True)
- # remove the unused socket file
- socket_file = self.msgq.server.msgq.socket_file
- try:
- if os.path.exists(socket_file):
- os.remove(socket_file)
- except OSError:
- pass
+ self.start()
More information about the bind10-changes
mailing list