[LNT] r255966 - Little deployment, logging and background process fixes

Chris Matthews via llvm-commits llvm-commits at lists.llvm.org
Thu Dec 17 18:09:59 PST 2015


Author: cmatthews
Date: Thu Dec 17 20:09:59 2015
New Revision: 255966

URL: http://llvm.org/viewvc/llvm-project?rev=255966&view=rev
Log:
Little deployment, logging and background process fixes

- Subprocesses share memory log.
- No need to limit requests to 300, that is actually far too often.
- Add timers to the slowest code so we will find out if they are too
slow in production.
- Don't calculate changes unless --commit=True, that is unneeded work.
- Allow the databases to be ripped down, and started again.  Needed to
for unit tests that run on the same database, and for subprocesses.
- Sometimes database loads fail if the file is locked, a quick retry
always seems to fix.
- No need to flush the memory logger regularly, we only use it in
memory.
- Always show how many processes are in the queue when log is rendered.
- Debugging endpoint to check if debugging is enabled (it should never
be in production!!).
- Memory buffer logging events don't have time, so don't render it.

Modified:
    lnt/trunk/Procfile
    lnt/trunk/lnt/server/db/fieldchange.py
    lnt/trunk/lnt/server/db/rules/rule_update_fixed_regressions.py
    lnt/trunk/lnt/server/db/testsuitedb.py
    lnt/trunk/lnt/server/db/v4db.py
    lnt/trunk/lnt/server/ui/app.py
    lnt/trunk/lnt/server/ui/templates/log.html
    lnt/trunk/lnt/server/ui/views.py
    lnt/trunk/lnt/testing/util/commands.py
    lnt/trunk/lnt/util/ImportData.py
    lnt/trunk/lnt/util/async_ops.py

Modified: lnt/trunk/Procfile
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/Procfile?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/Procfile (original)
+++ lnt/trunk/Procfile Thu Dec 17 20:09:59 2015
@@ -1 +1 @@
-web: gunicorn deployment.app_wrapper:app --log-file - --timeout 300 --max-requests 100 --preload
+web: gunicorn app_wrapper:app --log-file - --timeout 300

Modified: lnt/trunk/lnt/server/db/fieldchange.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/fieldchange.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/fieldchange.py (original)
+++ lnt/trunk/lnt/server/db/fieldchange.py Thu Dec 17 20:09:59 2015
@@ -2,7 +2,7 @@ import re
 import sqlalchemy.sql
 import lnt.server.reporting.analysis
 from lnt.testing.util.commands import warning
-from lnt.testing.util.commands import note
+from lnt.testing.util.commands import note, timed
 from lnt.server.db.regression import new_regression, RegressionState
 from lnt.server.db.regression import get_ris
 from lnt.server.db.regression import rebuild_title
@@ -14,6 +14,12 @@ from lnt.server.db import rules_manager
 FIELD_CHANGE_LOOKBACK = 10
 
 
+def post_submit_tasks(ts, run_id):
+    regenerate_fieldchanges_for_run(ts, run_id)
+
+
+
+ at timed
 def regenerate_fieldchanges_for_run(ts, run_id):
     """Regenerate the set of FieldChange objects for the given run.
     """
@@ -99,7 +105,6 @@ def regenerate_fieldchanges_for_run(ts,
     rules.post_submission_hooks(ts, regressions)
 
 
-
 def is_overlaping(fc1, fc2):
     """"Returns true if these two orders intersect. """
     r1_min = fc1.start_order
@@ -109,7 +114,7 @@ def is_overlaping(fc1, fc2):
     return (r1_min == r2_min and r1_max == r2_max) or \
            (r1_min < r2_max and r2_min < r1_max)
 
-
+ at timed
 def identify_related_changes(ts, regressions, fc):
     """Can we find a home for this change in some existing regression? """
     for regression in regressions:

Modified: lnt/trunk/lnt/server/db/rules/rule_update_fixed_regressions.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/rules/rule_update_fixed_regressions.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/rules/rule_update_fixed_regressions.py (original)
+++ lnt/trunk/lnt/server/db/rules/rule_update_fixed_regressions.py Thu Dec 17 20:09:59 2015
@@ -4,7 +4,7 @@ Staged or Active + fixed -> Verify
 """
 from lnt.server.db.regression import RegressionState
 from lnt.server.db.regression import get_cr_for_field_change, get_ris
-from lnt.testing.util.commands import note
+from lnt.testing.util.commands import note, timed
 
 def _fixed_rind(ts, rind):
     """Is this regression indicator fixed?"""
@@ -26,7 +26,7 @@ def is_fixed(ts, regression):
     return all(fixes)
 
 
-
+ at timed
 def regression_evolution(ts, run_id):
     """Analyse regressions. If they have changes, process them.
     Look at each regression in state detect.  Move to ignore if it is fixed.

Modified: lnt/trunk/lnt/server/db/testsuitedb.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/testsuitedb.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/testsuitedb.py (original)
+++ lnt/trunk/lnt/server/db/testsuitedb.py Thu Dec 17 20:09:59 2015
@@ -12,7 +12,7 @@ import sqlalchemy
 from sqlalchemy import *
 
 import testsuite
-from lnt.util import async_ops
+
 
 class TestSuiteDB(object):
     """
@@ -767,9 +767,6 @@ test %r does not map to a sample field i
                     self.add(sample)
 
                 sample.set_field(sample_field, value)
-        # No need to calculate fieldchanges at all if we won't commit them.
-        if commit:
-            async_ops.async_fieldchange_calc(self, run)
 
     def importDataFromDict(self, data, commit, config=None):
         """

Modified: lnt/trunk/lnt/server/db/v4db.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/v4db.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/v4db.py (original)
+++ lnt/trunk/lnt/server/db/v4db.py Thu Dec 17 20:09:59 2015
@@ -132,12 +132,19 @@ class V4DB(object):
         if self.session is not None:
             self.session.close()
     
-    def close_engine(self):
+    @staticmethod    
+    def close_engine(db_path):
         """Rip down everything about this path, so we can make it
         new again. This is used for tests that need to make a fresh
         in memory database."""
-        self._engine.pop(self.path)
-        V4DB._db_updated.remove(self.path)
+        V4DB._engine[db_path].dispose()
+        V4DB._engine.pop(db_path)
+        #V4DB._db_updated.remove(db_path)
+    
+    @staticmethod
+    def close_all_engines():
+        for key in V4DB._engine.keys():
+            V4DB.close_engine(key)
 
     def settings(self):
         """All the setting needed to recreate this instnace elsewhere."""

Modified: lnt/trunk/lnt/server/ui/app.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/ui/app.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/ui/app.py (original)
+++ lnt/trunk/lnt/server/ui/app.py Thu Dec 17 20:09:59 2015
@@ -9,6 +9,7 @@ import StringIO
 
 import flask
 from flask import current_app
+from flask import request
 from flask import g
 from flask import url_for
 from flask import Flask
@@ -25,6 +26,7 @@ import lnt.server.ui.regression_views
 from lnt.server.ui.api import load_api_resources
 import lnt.server.db.rules_manager
 
+from sqlalchemy.exc import DatabaseError
 
 class RootSlashPatchMiddleware(object):
     def __init__(self, app):
@@ -48,7 +50,6 @@ class Request(flask.Request):
         return time.time() - self.request_time
 
     # Utility Methods
-
     def get_db(self):
         """
         get_db() -> <db instance>
@@ -59,9 +60,10 @@ class Request(flask.Request):
 
         if self.db is None:
             echo = bool(self.args.get('db_log') or self.form.get('db_log'))
-
-            self.db = current_app.old_config.get_database(g.db_name, echo=echo)
-
+            try:
+                self.db = current_app.old_config.get_database(g.db_name, echo=echo)
+            except DatabaseError:
+                self.db = current_app.old_config.get_database(g.db_name, echo=echo)
             # Enable SQL logging with db_log.
             #
             # FIXME: Conditionalize on an is_production variable.
@@ -168,7 +170,7 @@ class App(flask.Flask):
         self.logger.addHandler(ch)
         
         # Log to mem for the /log view.
-        h = logging.handlers.MemoryHandler(1024 * 1024)
+        h = logging.handlers.MemoryHandler(1024 * 1024, flushLevel=logging.CRITICAL)
         h.setLevel(logging.DEBUG)
         self.logger.addHandler(h)
         # Also store the logger, so we can render the buffer in it.

Modified: lnt/trunk/lnt/server/ui/templates/log.html
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/ui/templates/log.html?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/ui/templates/log.html (original)
+++ lnt/trunk/lnt/server/ui/templates/log.html Thu Dec 17 20:09:59 2015
@@ -1,3 +1,4 @@
+{% set nosidebar = True %}
 {% import "utils.html" as utils %}
 
 {% extends "layout.html" %}
@@ -12,7 +13,6 @@
     <tr>
       <th>Kind</th>
       <th>Location</th>
-      <th>Time</th>
       <th>Message<th>
     </tr>
   </thead>
@@ -27,7 +27,6 @@
     {% endif %}
       <td>{{ item.levelname}}</td>
       <td>{{item.filename}}:{{item.lineno}}</td>
-      <td>{{ item.asctime}}</td>
       <td>{{ item.msg }}</td>
     </tr>
 {% endfor %}

Modified: lnt/trunk/lnt/server/ui/views.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/ui/views.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/server/ui/views.py (original)
+++ lnt/trunk/lnt/server/ui/views.py Thu Dec 17 20:09:59 2015
@@ -31,6 +31,7 @@ import lnt.server.reporting.dailyreport
 import lnt.server.reporting.summaryreport
 import lnt.server.db.rules_manager
 from collections import namedtuple
+from lnt.util import async_ops
 
 integral_rex = re.compile(r"[\d]+")
 
@@ -1172,5 +1173,10 @@ def rules():
 
 @frontend.route('/log')
 def log():
+    async_ops.check_workers()
     note("Showing log page.")
     return render_template("log.html")
+
+ at frontend.route('/debug')
+def debug():
+    assert current_app.debug == False

Modified: lnt/trunk/lnt/testing/util/commands.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/testing/util/commands.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/testing/util/commands.py (original)
+++ lnt/trunk/lnt/testing/util/commands.py Thu Dec 17 20:09:59 2015
@@ -7,23 +7,39 @@ import inspect
 import os
 import sys
 import logging
+import time
 from flask import current_app
 # FIXME: Find a better place for this code.
 
+
 def getLogger():
-    try:
-         logger = current_app.logger
-    except RuntimeError:
-        print "Using other logger."
-        logger = logging.getLogger("LNT")
+    logger = logging.getLogger("lnt.server.ui.app")
     return logger
+
 note = lambda message: getLogger().info(message)
 warning = lambda message: getLogger().warning(message)
 error = lambda message: getLogger().error(message)
 
+def timed(func):
+    def timed(*args, **kw):
+        t_start = time.time()
+        result = func(*args, **kw)
+        t_end = time.time()
+        short_args = repr(args)
+        if len(short_args) > 80:
+            short_args = short_args[0:80]
+        delta = t_end - t_start
+        msg = '%r (%s, %r) %2.2f sec' % (func.__name__, short_args, kw, delta)
+        if delta > 10:
+            warning(msg)
+        else:
+            note(msg)
+        return result
+
+    return timed
 
 def fatal(message):
-    logging.getLogger('LNT').critical(message)
+    getLogger().critical(message)
     sys.exit(1)
 
 

Modified: lnt/trunk/lnt/util/ImportData.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/util/ImportData.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/util/ImportData.py (original)
+++ lnt/trunk/lnt/util/ImportData.py Thu Dec 17 20:09:59 2015
@@ -5,7 +5,7 @@ import lnt.formats
 import lnt.server.reporting.analysis
 from lnt.testing.util.commands import note
 from lnt.util import NTEmailReport
-
+from lnt.util import async_ops
 
 def import_and_report(config, db_name, db, file, format, commit=False,
                       show_sample_count=False, disable_email=False,
@@ -107,12 +107,16 @@ def import_and_report(config, db_name, d
         result['added_samples'] = db.getNumSamples() - numSamples
 
     result['committed'] = commit
+    ts_name = data['Run']['Info'].get('tag')
     if commit:
         db.commit()
+        ts = db.testsuite.get(ts_name)
+        async_ops.async_fieldchange_calc(ts, run)
+
     else:
         db.rollback()
     # Add a handy relative link to the submitted run.
-    ts_name = data['Run']['Info'].get('tag')
+
     result['result_url'] = "db_{}/v4/{}/{}".format(db_name, ts_name, run.id)
     result['report_time'] = time.time() - importStartTime
     result['total_time'] = time.time() - startTime

Modified: lnt/trunk/lnt/util/async_ops.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/util/async_ops.py?rev=255966&r1=255965&r2=255966&view=diff
==============================================================================
--- lnt/trunk/lnt/util/async_ops.py (original)
+++ lnt/trunk/lnt/util/async_ops.py Thu Dec 17 20:09:59 2015
@@ -8,82 +8,127 @@ subprocesses (because they are not seria
 package in the system, but are generated on program load) we recreate the test
 suite that we need inside each subprocess before we execute the work job.
 """
+import os
+import time
 import logging
-from flask import current_app
+from flask import current_app, g
 import sys
 import lnt.server.db.fieldchange as fieldchange
 import lnt.server.db.v4db
 import traceback
+import contextlib
 import multiprocessing
-from multiprocessing import Pool, TimeoutError
-from lnt.testing.util.commands import note
-NUM_WORKERS = 2  # The number of subprocesses to spawn per LNT process.
+from multiprocessing import Pool, TimeoutError, Manager, Process
+from threading import Lock
+from lnt.testing.util.commands import note, warning, timed, error
+NUM_WORKERS = 4  # The number of subprocesses to spawn per LNT process.
 WORKERS = None  # The worker pool.
 
+JOBS = []
+
+
 
 def launch_workers():
     """Make sure we have a worker pool ready to queue."""
     global WORKERS
     if not WORKERS:
-        logger = multiprocessing.log_to_stderr()
-        logger.setLevel(logging.INFO)
         note("Starting workers")
-        WORKERS = Pool(NUM_WORKERS)
-
+        manager = Manager()            
+        current_app.config['mem_logger'].buffer = manager.list(current_app.config['mem_logger'].buffer)
+    
 
 def async_fieldchange_calc(ts, run):
     """Run regenerate field changes in the background."""
     func_args = {'run_id': run.id}
     #  Make sure this run is in the database!
-    ts.commit()
-    async_run_job(fieldchange.regenerate_fieldchanges_for_run,
+    async_run_job(fieldchange.post_submit_tasks,
                   ts,
                   func_args)
 
-
+def check_workers():
+    global JOBS
+    JOBS = [x for x in JOBS if x.is_alive()]
+    still_running = len(JOBS)
+    msg = "{} Job(s) in the queue.".format(still_running)
+    if still_running > 5:
+        # This could be run outside of the application context, so use
+        # full logger name.
+        logging.getLogger("lnt.server.ui.app").warning(msg)
+    elif still_running > 0:
+        logging.getLogger("lnt.server.ui.app").info(msg)
+    else:
+        logging.getLogger("lnt.server.ui.app").info("Job queue empty.")
+    #print [x for x in JOBS if not x.successful()]
+        
 def async_run_job(job, ts, func_args):
     """Send a job to the async wrapper in the subprocess."""
     # If the run is not in the database, we can't do anything more.
-    note("Queuing background job to process fieldchanges")
-    args = {'tsname': ts.name,
-            'db': ts.v4db.settings()}
+    note("Queuing background job to process fieldchanges " + str(os.getpid()))
     launch_workers()
-    job = WORKERS.apply_async(async_wrapper,
-                              [job, args, func_args],
-                              callback=async_job_finished)
-    # Lets see if we crash right away?
-    try:
-        job.get(timeout=1)
-    except TimeoutError:
-        pass
-
+    check_workers()
+    args = {'tsname': ts.name,
+            'db': g.db_name}
+    job = Process(target=async_wrapper,
+                  args=[job, args, func_args])
 
+    job.start()
+    # Lets see if we crash right away?
+    # try:
+    #     stuff = job.join(timeout=1)
+    #     assert False, "This should aways fail."
+    # except TimeoutError:
+    JOBS.append(job)
+
+# Flag to track if we have disposed of the parents database connections in
+# this subprocess.
+clean_db = False
+lock = Lock()
 def async_wrapper(job, ts_args, func_args):
     """Setup test-suite in this subprocess and run something.
     
     Because of multipocessing, capture excptions and log messages,
     and return them.
     """
+    global clean_db, lock
     try:
-        print >>sys.stderr,"Test"
-        h = logging.handlers.MemoryHandler(1024 * 1024)
-        h.setLevel(logging.DEBUG)
-        logging.getLogger('LNT').addHandler(h)
-        note("Running async wrapper: {}".format(job.__name__))
-        _v4db = lnt.server.db.v4db.V4DB(**ts_args['db'])
+
+        start_time = time.time()
+        
+        if not clean_db:
+            lnt.server.db.v4db.V4DB.close_all_engines()
+            clean_db = True
+        
+        note("Running async wrapper: {} ".format(job.__name__)+ str(os.getpid()))
+
+        _v4db = current_app.old_config.get_database(ts_args['db'])
+        #with contextlib.closing(_v4db) as db:
         ts = _v4db.testsuite[ts_args['tsname']]
-        job(ts, **func_args)
+        nothing = job(ts, **func_args)
+        assert nothing is None
+        end_time = time.time()
+        delta = end_time-start_time
+        msg = "Finished: {name} in {time:.2f}s ".format(name=job.__name__,
+                                                time=delta)
+        if delta < 100:
+            note(msg)
+        else:
+            warning(msg)
     except:
         # Put all exception text into an exception and raise that for our
         # parent process.
-        return Exception("".join(traceback.format_exception(*sys.exc_info())))
-    return h.buffer
-
-
-def async_job_finished(arg):
-    if isinstance(arg, Exception):
-        raise arg
-    if isinstance(arg, list):
-        for log_entry in arg:
-            logging.getLogger('LNT').handle(log_entry)
-    
+        error("".join(traceback.format_exception(*sys.exc_info())))
+        sys.exit(1)
+    sys.exit(0)
+
+
+def make_callback():
+    app = current_app
+    def async_job_finished(arg):
+        if isinstance(arg, Exception):
+            logging.getLogger("lnt.server.ui.app").error(str(arg))
+            raise arg
+        if isinstance(arg, list):
+            for log_entry in arg:
+                logging.getLogger("lnt.server.ui.app").handle(log_entry)
+        check_workers()
+    return async_job_finished




More information about the llvm-commits mailing list