[LNT] r255951 - Calculate field changes in background process

Chris Matthews via llvm-commits llvm-commits at lists.llvm.org
Thu Dec 17 17:17:02 PST 2015


Author: cmatthews
Date: Thu Dec 17 19:17:01 2015
New Revision: 255951

URL: http://llvm.org/viewvc/llvm-project?rev=255951&view=rev
Log:
Calculate field changes in background process

Added:
    lnt/trunk/lnt/util/async_ops.py
Modified:
    lnt/trunk/lnt/server/db/fieldchange.py
    lnt/trunk/lnt/server/db/testsuitedb.py
    lnt/trunk/lnt/server/db/v4db.py
    lnt/trunk/lnt/util/ImportData.py

Modified: lnt/trunk/lnt/server/db/fieldchange.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/fieldchange.py?rev=255951&r1=255950&r2=255951&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/fieldchange.py (original)
+++ lnt/trunk/lnt/server/db/fieldchange.py Thu Dec 17 19:17:01 2015
@@ -1,26 +1,20 @@
-
 import sqlalchemy.sql
-
 import lnt.server.reporting.analysis
 from lnt.testing.util.commands import warning
-
-
+from lnt.testing.util.commands import note
 # How many runs backwards to use in the previous run set.
 # More runs are slower (more DB access), but may provide
 # more accurate results.
 FIELD_CHANGE_LOOKBACK = 10
 
-from lnt.testing.util.commands import note
 
-
-def regenerate_fieldchanges_for_run(ts, run):
+def regenerate_fieldchanges_for_run(ts, run_id):
     """Regenerate the set of FieldChange objects for the given run.
     """
-    
     # Allow for potentially a few different runs, previous_runs, next_runs
     # all with the same order_id which we will aggregate together to make
     # our comparison result.
-
+    run = ts.getRun(run_id)
     runs = ts.query(ts.Run). \
         filter(ts.Run.order_id == run.order_id). \
         filter(ts.Run.machine_id == run.machine_id). \
@@ -37,7 +31,7 @@ def regenerate_fieldchanges_for_run(ts,
         end_order = next_runs[-1].order
     else:
         end_order = run.order
-    
+
     # Load our run data for the creation of the new fieldchanges.
     runs_to_load = [r.id for r in (runs + previous_runs)]
 
@@ -84,13 +78,11 @@ def regenerate_fieldchanges_for_run(ts,
                                    field=field)
                 f.test_id = test_id
                 ts.add(f)
-
                 note("Found field change: {}".format(run.machine))
 
-                ts.commit()
-
             # Always update FCs with new values.
             if f:
                 f.old_value = result.previous
                 f.new_value = result.current
                 f.run = run
+    ts.commit()

Modified: lnt/trunk/lnt/server/db/testsuitedb.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/testsuitedb.py?rev=255951&r1=255950&r2=255951&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/testsuitedb.py (original)
+++ lnt/trunk/lnt/server/db/testsuitedb.py Thu Dec 17 19:17:01 2015
@@ -12,7 +12,7 @@ import sqlalchemy
 from sqlalchemy import *
 
 import testsuite
-import lnt.server.db.fieldchange
+from lnt.util import async_ops
 
 class TestSuiteDB(object):
     """
@@ -696,7 +696,7 @@ supplied run is missing required run par
 
             return run,True
 
-    def _importSampleValues(self, tests_data, run, tag):
+    def _importSampleValues(self, tests_data, run, tag, commit):
         # We now need to transform the old schema data (composite samples split
         # into multiple tests with mangling) into the V4DB format where each
         # sample is a complete record.
@@ -766,10 +766,11 @@ test %r does not map to a sample field i
                     self.add(sample)
 
                 sample.set_field(sample_field, value)
-        
-        lnt.server.db.fieldchange.regenerate_fieldchanges_for_run(self, run)
-    
-    def importDataFromDict(self, data, config=None):
+        # No need to calculate fieldchanges at all if we won't commit them.
+        if commit:
+            async_ops.async_fieldchange_calc(self, run)
+
+    def importDataFromDict(self, data, commit, config=None):
         """
         importDataFromDict(data) -> Run, bool
 
@@ -793,8 +794,8 @@ test %r does not map to a sample field i
         # submission. Return the prior Run.
         if not inserted:
             return False, run
-
-        self._importSampleValues(data['Tests'], run, tag)
+        
+        self._importSampleValues(data['Tests'], run, tag, commit)
 
         return True, run
 

Modified: lnt/trunk/lnt/server/db/v4db.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/server/db/v4db.py?rev=255951&r1=255950&r2=255951&view=diff
==============================================================================
--- lnt/trunk/lnt/server/db/v4db.py (original)
+++ lnt/trunk/lnt/server/db/v4db.py Thu Dec 17 19:17:01 2015
@@ -75,7 +75,7 @@ class V4DB(object):
         self.path = path
         self.config = config
         self.baseline_revision = baseline_revision
-
+        self.echo = echo
         with V4DB._engine_lock:
             if path not in V4DB._engine:
                 V4DB._engine[path] = sqlalchemy.create_engine(path, echo=echo)
@@ -132,6 +132,13 @@ class V4DB(object):
         if self.session is not None:
             self.session.close()
 
+    def settings(self):
+        """All the setting needed to recreate this instnace elsewhere."""
+        return {'path': self.path,
+                'config': self.config,
+                'baseline_revision': self.baseline_revision,
+                'echo': self.echo}
+
     @property
     def testsuite(self):
         # This is the start of "magic" part of V4DB, which allows us to get
@@ -159,7 +166,7 @@ class V4DB(object):
         return sum([ts.query(ts.Test).count()
                     for ts in self.testsuite.values()])
 
-    def importDataFromDict(self, data, config=None):
+    def importDataFromDict(self, data, commit, config=None):
         # Select the database to import into.
         #
         # FIXME: Promote this to a top-level field in the data.
@@ -172,4 +179,4 @@ class V4DB(object):
             raise ValueError,"test suite %r not present in this database!" % (
                 db_name)
 
-        return db.importDataFromDict(data, config)
+        return db.importDataFromDict(data, commit, config)

Modified: lnt/trunk/lnt/util/ImportData.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/util/ImportData.py?rev=255951&r1=255950&r2=255951&view=diff
==============================================================================
--- lnt/trunk/lnt/util/ImportData.py (original)
+++ lnt/trunk/lnt/util/ImportData.py Thu Dec 17 19:17:01 2015
@@ -71,7 +71,7 @@ def import_and_report(config, db_name, d
 
     importStartTime = time.time()
     try:
-        success,run = db.importDataFromDict(data, config=db_config)
+        success, run = db.importDataFromDict(data, commit, config=db_config)
     except KeyboardInterrupt:
         raise
     except:

Added: lnt/trunk/lnt/util/async_ops.py
URL: http://llvm.org/viewvc/llvm-project/lnt/trunk/lnt/util/async_ops.py?rev=255951&view=auto
==============================================================================
--- lnt/trunk/lnt/util/async_ops.py (added)
+++ lnt/trunk/lnt/util/async_ops.py Thu Dec 17 19:17:01 2015
@@ -0,0 +1,67 @@
+"""Asynchrounus operations for LNT.
+
+For big tasks it is nice to be able to run in the backgorund.  This module
+contains wrappers to run particular LNT tasks in subprocesess. 
+
+Because multiprocessing cannot directly use the LNT test-suite objects in
+subprocesses (because they are not serializable because they don't have a fix
+package in the system, but are generated on program load) we recreate the test
+suite that we need inside each subprocess before we execute the work job.
+"""
+import logging
+from flask import current_app
+import sys
+import lnt.server.db.fieldchange as fieldchange
+import lnt.server.db.v4db
+import traceback
+import multiprocessing
+from multiprocessing import Pool
+
+NUM_WORKERS = 2  # The number of subprocesses to spawn per LNT process.
+WORKERS = None  # The worker pool.
+
+
+def launch_workers():
+    """Make sure we have a worker pool ready to queue."""
+    global WORKERS
+    if not WORKERS:
+        logger = multiprocessing.log_to_stderr()
+        logger.setLevel(logging.INFO)
+        WORKERS = Pool(NUM_WORKERS)
+
+
+def async_fieldchange_calc(ts, run):
+    """Run regenerate field changes in the background."""
+    func_args = {'run_id': run.id}
+    #  Make sure this run is in the database!
+    ts.commit()
+    async_run_job(fieldchange.regenerate_fieldchanges_for_run,
+                  ts,
+                  func_args)
+
+
+def async_run_job(job, ts, func_args):
+    """Send a job to the async wrapper in the subprocess."""
+    # If the run is not in the database, we can't do anything more.
+    print "Queuing background job to process fieldchanges"
+    args = {'tsname': ts.name,
+            'db': ts.v4db.settings()}
+    launch_workers()
+    job = WORKERS.apply_async(async_wrapper, [job, args, func_args])
+    job.get()
+
+
+def async_wrapper(job, ts_args, func_args):
+    """Setup test-suite in this subprocess and run something."""
+    try:
+        print >> sys.stderr, "Running async wrapper"
+        logging.info(str(job))
+        _v4db = lnt.server.db.v4db.V4DB(**ts_args['db'])
+        ts = _v4db.testsuite[ts_args['tsname']]
+        logging.info("Calculating field changes for ")
+        job(ts, **func_args)
+        logging.info("Done calculating field changes")
+    except:
+        # Put all exception text into an exception and raise that for our
+        # parent process.
+        raise Exception("".join(traceback.format_exception(*sys.exc_info())))




More information about the llvm-commits mailing list