Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-13275

Upload verification job and data blobs to an S3 bucket using Celery

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We decided to change how SQUASH handles the data blobs produced by the verification jobs. This is motivated by the performance issues seen in DM-13259 pushing the HSC test dataset to the new SQuaSH API.

      In this implementation the full verification job will still be sent to the SQuaSH API, the measurements and metadata will be stored in the MySQL database while the data blobs will be uploaded to an S3 bucket and the corresponding reference (the S3 object URI) will be kept in the database. We are also uploading the whole job document to S3 to have the source of truth around in case it is needed for a future refactoring.

      This change goes in the direction of the Data Butler composite datasets and is a shim while we don't have that functionality in place. 

      It is also interesting from the point of view of data access an visualization in SQuaSH. We could transform and store these data blobs in a more appropriate format such as parquet instead of retrieving them in JSON from the REST API as we do now.

       

       

       

        Attachments

          Issue Links

            Activity

            Hide
            afausti Angelo Fausti added a comment - - edited

            The upload of data blobs to S3 is done in background using Celery. The recommended pattern to integrate Celery with Flask is here: http://flask.pocoo.org/docs/0.12/patterns/celery/. However that presents a "circular" import issue when integrating Celery with a large Flask app, discussed on several posts. Here is a good post about that problem.  And there's also this solution presented by Miguel Grinberg.

            I took a more direct approach, since my Celery tasks don't need to be aware of the Flask application context,  for example they don't require database queries, etc, at least for now, I've created a tasks package in my application following the lines presented here.

            Show
            afausti Angelo Fausti added a comment - - edited The upload of data blobs to S3 is done in background using Celery. The recommended pattern to integrate Celery with Flask is here: http://flask.pocoo.org/docs/0.12/patterns/celery/ . However that presents a "circular" import issue when integrating Celery with a large Flask app, discussed on several posts. Here is a good post about that problem.  And there's also this solution presented by Miguel Grinberg. I took a more direct approach, since my Celery tasks don't need to be aware of the Flask application context,  for example they don't require database queries, etc, at least for now, I've created a tasks package in my application following the lines presented  here .
            Hide
            afausti Angelo Fausti added a comment -

            Refactored job resource, added error handling and logging. Example of outoput.

            --------------------------------------------------------------------------------
            ERROR in job [/Users/afausti/Projects/squash-deployment/squash-restful-api/app/api_v1/job.py:155]:
            Metric `validate_drp.AF3_stretch` not found, it looks like the metrics definition is out of date.
            --------------------------------------------------------------------------------
            127.0.0.1 - - [24/Jan/2018 11:08:58] "POST /job HTTP/1.1" 400 -
            

            Show
            afausti Angelo Fausti added a comment - Refactored job resource, added error handling and logging. Example of outoput. -------------------------------------------------------------------------------- ERROR in job [/Users/afausti/Projects/squash-deployment/squash-restful-api/app/api_v1/job.py: 155 ]: Metric `validate_drp.AF3_stretch` not found, it looks like the metrics definition is out of date. -------------------------------------------------------------------------------- 127.0 . 0.1 - - [ 24 /Jan/ 2018 11 : 08 : 58 ] "POST /job HTTP/1.1" 400 -
            Hide
            afausti Angelo Fausti added a comment - - edited

            Added s3_uri field to job and blob tables to register the location of the corresponding job document and of the individual data blobs.

            Created a many-to-many relation between measurements and blobs through the measurement_blob association table to store the blobs associated with the measurements.

            Show
            afausti Angelo Fausti added a comment - - edited Added s3_uri field to job and blob tables to register the location of the corresponding job document and of the individual data blobs. Created a many-to-many relation between measurements and blobs through the measurement_blob association table to store the blobs associated with the measurements.
            Hide
            afausti Angelo Fausti added a comment -

            When the job an blob objects are created the  s3_uri field is left blank and it is updated later by the upload_job and upload_blobs tasks.

            The measurement and the associated bobs are inserted in a single transaction, like this:

            2018-01-25 04:43:48,344 INFO sqlalchemy.engine.base.Engine INSERT INTO measurement (value, metric_name, unit, metric_id, job_id) VALUES (%(value)s, %(metric_name)s, %(unit)s, %(metric_id)s, %(job_id)s)
            2018-01-25 04:43:48,344 INFO sqlalchemy.engine.base.Engine {'value': 41.72932330827068, 'metric_name': 'validate_drp.PF1_minimum_gri', 'unit': '%', 'metric_id': 61, 'job_id': 3}
            2018-01-25 04:43:48,347 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s)
            2018-01-25 04:43:48,347 INFO sqlalchemy.engine.base.Engine {'identifier': 'd34a77ba0b234f6b95dc88cfccacccc6', 'name': 'AnalyticAstrometryModel', 's3_uri': None}
            2018-01-25 04:43:48,350 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s)
            2018-01-25 04:43:48,350 INFO sqlalchemy.engine.base.Engine {'identifier': 'e92c8be5a34c45d1aaad00d5913d27c9', 'name': 'MatchedMultiVisitDataset', 's3_uri': None}
            2018-01-25 04:43:48,351 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s)
            2018-01-25 04:43:48,351 INFO sqlalchemy.engine.base.Engine {'identifier': '47a6adfa7ea14add9c44deb64e4b4988', 'name': 'validate_drp.PF1_minimum_gri', 's3_uri': None}
            2018-01-25 04:43:48,353 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s)
            2018-01-25 04:43:48,353 INFO sqlalchemy.engine.base.Engine {'identifier': '42ddf00268f244748d33f6dbae219371', 'name': 'PhotometricErrorModel', 's3_uri': None}
            2018-01-25 04:43:48,355 INFO sqlalchemy.engine.base.Engine INSERT INTO measurement_blob (measurement_id, blob_id) VALUES (%(measurement_id)s, %(blob_id)s)
            2018-01-25 04:43:48,355 INFO sqlalchemy.engine.base.Engine ({'measurement_id': 30, 'blob_id': 117}, {'measurement_id': 30, 'blob_id': 118}, {'measurement_id': 30, 'blob_id': 119}, {'measurement_id': 30, 'blob_id': 120})
            2018-01-25 04:43:48,356 INFO sqlalchemy.engine.base.Engine COMMIT
            

            Show
            afausti Angelo Fausti added a comment - When the job an blob objects are created the  s3_uri field is left blank and it is updated later by the upload_job and  upload_blobs tasks. The measurement and the associated bobs are inserted in a single transaction, like this: 2018-01-25 04:43:48,344 INFO sqlalchemy.engine.base.Engine INSERT INTO measurement (value, metric_name, unit, metric_id, job_id) VALUES (%(value)s, %(metric_name)s, %(unit)s, %(metric_id)s, %(job_id)s) 2018-01-25 04:43:48,344 INFO sqlalchemy.engine.base.Engine {'value': 41.72932330827068, 'metric_name': 'validate_drp.PF1_minimum_gri', 'unit': '%', 'metric_id': 61, 'job_id': 3} 2018-01-25 04:43:48,347 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s) 2018-01-25 04:43:48,347 INFO sqlalchemy.engine.base.Engine {'identifier': 'd34a77ba0b234f6b95dc88cfccacccc6', 'name': 'AnalyticAstrometryModel', 's3_uri': None} 2018-01-25 04:43:48,350 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s) 2018-01-25 04:43:48,350 INFO sqlalchemy.engine.base.Engine {'identifier': 'e92c8be5a34c45d1aaad00d5913d27c9', 'name': 'MatchedMultiVisitDataset', 's3_uri': None} 2018-01-25 04:43:48,351 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s) 2018-01-25 04:43:48,351 INFO sqlalchemy.engine.base.Engine {'identifier': '47a6adfa7ea14add9c44deb64e4b4988', 'name': 'validate_drp.PF1_minimum_gri', 's3_uri': None} 2018-01-25 04:43:48,353 INFO sqlalchemy.engine.base.Engine INSERT INTO `blob` (identifier, name, s3_uri) VALUES (%(identifier)s, %(name)s, %(s3_uri)s) 2018-01-25 04:43:48,353 INFO sqlalchemy.engine.base.Engine {'identifier': '42ddf00268f244748d33f6dbae219371', 'name': 'PhotometricErrorModel', 's3_uri': None} 2018-01-25 04:43:48,355 INFO sqlalchemy.engine.base.Engine INSERT INTO measurement_blob (measurement_id, blob_id) VALUES (%(measurement_id)s, %(blob_id)s) 2018-01-25 04:43:48,355 INFO sqlalchemy.engine.base.Engine ({'measurement_id': 30, 'blob_id': 117}, {'measurement_id': 30, 'blob_id': 118}, {'measurement_id': 30, 'blob_id': 119}, {'measurement_id': 30, 'blob_id': 120}) 2018-01-25 04:43:48,356 INFO sqlalchemy.engine.base.Engine COMMIT
            Hide
            afausti Angelo Fausti added a comment -

            Example of upload tasks running in background through Celery.

            [2018-01-25 04:58:37,595: INFO/ForkPoolWorker-4] Starting new HTTPS connection (1): s3.amazonaws.com
            [2018-01-25 04:58:37,766: INFO/ForkPoolWorker-2] Task app.tasks.s3.upload_object[57099d81-0f25-4f55-9f43-91a6da4e4ec4] succeeded in 1.1368964269931894s: 's3://squash-dev.data/6b43f1a88f854d84873daed2d2b7127c'
            [2018-01-25 04:58:37,896: INFO/ForkPoolWorker-1] Task app.tasks.s3.upload_object[60503fa7-e8cd-4452-8355-c29574871824] succeeded in 0.7496846080175601s: 's3://squash-dev.data/30f8f6dd89bd440bbed6fdf5ac4eb068'
            [2018-01-25 04:58:38,349: INFO/ForkPoolWorker-4] Task app.tasks.s3.upload_object[dc78f7d6-4467-46ed-bae1-07bfac8fb632] succeeded in 0.7646799370122608s: 's3://squash-dev.data/7d1a1b6d920a4f72969fd2f0deff33d9'
            [2018-01-25 04:58:40,623: INFO/ForkPoolWorker-3] Task app.tasks.s3.upload_object[153b333b-1a89-47e8-ab00-de3b7c51d349] succeeded in 3.117821443011053s: 's3://squash-dev.data/aa41d7da79844620ae8cc45ae2845003'

            Show
            afausti Angelo Fausti added a comment - Example of upload tasks running in background through Celery. [ 2018 - 01 - 25 04 : 58 : 37 , 595 : INFO/ForkPoolWorker- 4 ] Starting new HTTPS connection ( 1 ): s3.amazonaws.com [ 2018 - 01 - 25 04 : 58 : 37 , 766 : INFO/ForkPoolWorker- 2 ] Task app.tasks.s3.upload_object[57099d81-0f25-4f55-9f43-91a6da4e4ec4] succeeded in 1 .1368964269931894s: 's3://squash-dev.data/6b43f1a88f854d84873daed2d2b7127c' [ 2018 - 01 - 25 04 : 58 : 37 , 896 : INFO/ForkPoolWorker- 1 ] Task app.tasks.s3.upload_object[60503fa7-e8cd- 4452 - 8355 -c29574871824] succeeded in 0 .7496846080175601s: 's3://squash-dev.data/30f8f6dd89bd440bbed6fdf5ac4eb068' [ 2018 - 01 - 25 04 : 58 : 38 , 349 : INFO/ForkPoolWorker- 4 ] Task app.tasks.s3.upload_object[dc78f7d6- 4467 -46ed-bae1-07bfac8fb632] succeeded in 0 .7646799370122608s: 's3://squash-dev.data/7d1a1b6d920a4f72969fd2f0deff33d9' [ 2018 - 01 - 25 04 : 58 : 40 , 623 : INFO/ForkPoolWorker- 3 ] Task app.tasks.s3.upload_object[153b333b-1a89-47e8-ab00-de3b7c51d349] succeeded in 3 .117821443011053s: 's3://squash-dev.data/aa41d7da79844620ae8cc45ae2845003'
            Hide
            afausti Angelo Fausti added a comment -

            Successful tests with Cfht and HSC data after deployment.
            See also DM-13259.

            $ dispatch_verify.py  --env jenkins --lsstsw $(pwd) --url https://squash-restful-api-demo.lsst.codes/ --user testuser --password testpwd data_hsc_rerun_20170105_HSC-I.json
            verify.bin.dispatchverify.main INFO: Loading data_hsc_rerun_20170105_HSC-I.json
            verify.bin.dispatchverify.main INFO: Refreshing metric definitions from verify_metrics
            verify.bin.dispatchverify.main INFO: Inserting lsstsw package metadata from /Users/afausti/Projects/lsstsw/lsstsw.
            verify.bin.dispatchverify.main INFO: Inserting Jenkins CI environment metadata.
            verify.bin.dispatchverify.main INFO: Uploading Job JSON to https://squash-restful-api-demo.lsst.codes/.
            verify.squash.get INFO: GET https://squash-restful-api-demo.lsst.codes/ status: 200
            verify.squash.post INFO: POST https://squash-restful-api-demo.lsst.codes/auth status: 200
            verify.squash.post INFO: POST https://squash-restful-api-demo.lsst.codes/job status: 202
            

            Show
            afausti Angelo Fausti added a comment - Successful tests with Cfht and HSC data after deployment. See also DM-13259 . $ dispatch_verify.py --env jenkins --lsstsw $(pwd) --url https: //squash-restful-api-demo.lsst.codes/ --user testuser --password testpwd data_hsc_rerun_20170105_HSC-I.json verify.bin.dispatchverify.main INFO: Loading data_hsc_rerun_20170105_HSC-I.json verify.bin.dispatchverify.main INFO: Refreshing metric definitions from verify_metrics verify.bin.dispatchverify.main INFO: Inserting lsstsw package metadata from /Users/afausti/Projects/lsstsw/lsstsw. verify.bin.dispatchverify.main INFO: Inserting Jenkins CI environment metadata. verify.bin.dispatchverify.main INFO: Uploading Job JSON to https: //squash-restful-api-demo.lsst.codes/. verify.squash.get INFO: GET https: //squash-restful-api-demo.lsst.codes/ status: 200 verify.squash.post INFO: POST https: //squash-restful-api-demo.lsst.codes/auth status: 200 verify.squash.post INFO: POST https: //squash-restful-api-demo.lsst.codes/job status: 202
            Show
            afausti Angelo Fausti added a comment - https://github.com/lsst-sqre/squash-rest-api/pull/12

              People

              • Assignee:
                afausti Angelo Fausti
                Reporter:
                afausti Angelo Fausti
                Watchers:
                Angelo Fausti
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel