Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-19536

Evaluate Apache Cassandra as PPDB back-end option

    XMLWordPrintable

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: dax_ppdb
    • Labels:
    • Story Points:
      16
    • Sprint:
      DB_S19_04, DB_S19_05
    • Team:
      Data Access and Database

      Description

      AS discussed at db-team Wed meeting it's time to start looking at NoSQL options for PPDB backends, several candidates were mentioned with Cassandra probably being most promising.

      I'm going to study everything Cassandra and try to implement PPDB with it. We do not have a reasonable scale testing platform for it but I can do at leas a basic testing using either NCSA cluster or even my desktop.

        Attachments

          Issue Links

            Activity

            Hide
            salnikov Andy Salnikov added a comment -

            Partitioning of the tables.

            There are just two major problems that need to be solved to make Cassandra performing efficiently - selecting good partitioning and good ordering of the data (as saved in sstables). This boils down to selecting columns for primary key (partitioning key is the first part of the PK, clustering key is remaining columns).

            Selecting one or few partitions out of many is very efficient way to limit mount of data being processed. Cassandra only supports equality relation on partition key or IN operation on the last (or only) column of the partition key. This means one has to usually write multiple queries, e.g. one query par partition if remaining selection differs from partition to partition. This in turn can cause overhead if number of partitions is large.

            Selection on clustering key has to be done so that one query selects a contiguous range of keys (from a given partition). This also means that some complex queries will have to be split into multiple queries.

            Choosing right keys depends entirely on the query(-ies) that we run on the tables, so I'm going to summarize these.

            DiaObject(Last) table

            The query for this table is "select latest version of every DiaObject in a region covered by one CCD (or maybe arbitrary region)". If we have a separate DiaObjectLast table then it already contains latest versions only. With only DiaObject table I don't think we can achieve the same thing in a reasonable way - the "last" flag changes and obviously it cannot be a part of the partition or clustering key, we'd have to read a lot of records and filter them which will kill performance quickly.

            So far we had spatial queries in PPDB based on HTM index, for Cassandra it makes sense to use the same approach or use other pixelization, e.g. Q3C. For partitioning purposes we wand relatively large "pixels", comparable in size to CCD (see above), so partitioning column could be HTM level=8 for example. The normal selection is still based on based on pixelId (htm20) using ranges of those. Because we select ranges of pixelIds it's natural to use pixelId as a clustering key.

            Altogether it may look like

            CREATE TABLE DiaObjectLast (
                partId int,           // htm8 of ra, dec
                pixelId int,          // htm20 of ra, dec
                diaObjectId int,
                ...,
                primary key (partId, pixelId, diaObjectId)
            );
             
            // series of queries for one visit
            SELECT * from DiaObjectLast WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300;
            SELECT * from DiaObjectLast WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200;
            SELECT * from DiaObjectLast WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700;
            ...
            

            (with the caveat that HTM indices do not change after position updates, it should be ra/dec of the first version of DiaObject).

            One potentially interesting idea is to make pixelId a part of the diaObjectId (higher bits, so that contiguous location is preserved), then we could guarantee that pixelId is stable and cannot change.

            Dia(Forced)Source table

            There is some freedom for choosing which query is used to retrieve data from these tables - if the set of diaObjectIds is known then one can select based on that (and for relational databases this seems to work better than other approach), otherwise same region-based select as for DiaObjectLast should be used. Additionally we only need to select last several months (12) of sources from these tables.

            I think region-based selection should be preferred because it can run in parallel with DiaObjectLast select (if we have enough capacity). That would make partitioning and clustering to be based on htm similarly to DiaObjectLast. The difference is the additional time constraint.

            I don't think it would be possible to use timestamp in a clustering key without blowing up the number of queries (2-D space range selection is inherently non-contiguous). I could imagine something like this though which would result in a factor 10 more queries than for above case

            CREATE TABLE DiaSource (
                partId int,           // htm8 of ra, dec
                pixelId int,          // htm20 of ra, dec
                month int,            // year and month combined, e.g. 202010 for Oct 2020
                diaSourceId int,
                ...,
                primary key (partId, pixelId, month, diaSourceId)
            );
             
            // series of queries for one visit, just for two months
            SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300 AND month = 202010;
            SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300 AND month = 202011;
            SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200 AND month = 202010;
            SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200 AND month = 202011;
            SELECT * from DiaSource WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700 AND month = 202010;
            SELECT * from DiaSource WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700 AND month = 202011;
            ...
            

            Another option is to use "month" as a second part of partition key, Cassandra allows IN operator for the last column of partition key so we can do something like:

            CREATE TABLE DiaSource (
                partId int,           // htm8 of ra, dec
                month int,            // year and month combined, e.g. 202010 for Oct 2020
                pixelId int,          // htm20 of ra, dec
                diaSourceId int,
                ...,
                primary key ((partId, month), pixelId, diaSourceId)
            );
             
            // series of queries for one visit, just for two months
            SELECT * from DiaSource WHERE partId = 1 AND month IN (202010, 202011, 202012) AND pixelId >= 10000 AND pixelId < 12300;
            SELECT * from DiaSource WHERE partId = 1 AND month IN (202010, 202011, 202012) AND pixelId >= 14000 AND pixelId < 15200;
            SELECT * from DiaSource WHERE partId = 2 AND month IN (202010, 202011, 202012) AND pixelId >= 21000 AND pixelId < 21700;
            ...
            

            I want to try latter variant to see if that actually works.

            Show
            salnikov Andy Salnikov added a comment - Partitioning of the tables. There are just two major problems that need to be solved to make Cassandra performing efficiently - selecting good partitioning and good ordering of the data (as saved in sstables). This boils down to selecting columns for primary key (partitioning key is the first part of the PK, clustering key is remaining columns). Selecting one or few partitions out of many is very efficient way to limit mount of data being processed. Cassandra only supports equality relation on partition key or IN operation on the last (or only) column of the partition key. This means one has to usually write multiple queries, e.g. one query par partition if remaining selection differs from partition to partition. This in turn can cause overhead if number of partitions is large. Selection on clustering key has to be done so that one query selects a contiguous range of keys (from a given partition). This also means that some complex queries will have to be split into multiple queries. Choosing right keys depends entirely on the query(-ies) that we run on the tables, so I'm going to summarize these. DiaObject(Last) table The query for this table is "select latest version of every DiaObject in a region covered by one CCD (or maybe arbitrary region)". If we have a separate DiaObjectLast table then it already contains latest versions only. With only DiaObject table I don't think we can achieve the same thing in a reasonable way - the "last" flag changes and obviously it cannot be a part of the partition or clustering key, we'd have to read a lot of records and filter them which will kill performance quickly. So far we had spatial queries in PPDB based on HTM index, for Cassandra it makes sense to use the same approach or use other pixelization, e.g. Q3C. For partitioning purposes we wand relatively large "pixels", comparable in size to CCD (see above), so partitioning column could be HTM level=8 for example. The normal selection is still based on based on pixelId (htm20) using ranges of those. Because we select ranges of pixelIds it's natural to use pixelId as a clustering key. Altogether it may look like CREATE TABLE DiaObjectLast ( partId int , // htm8 of ra, dec pixelId int , // htm20 of ra, dec diaObjectId int , ..., primary key (partId, pixelId, diaObjectId) );   // series of queries for one visit SELECT * from DiaObjectLast WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300; SELECT * from DiaObjectLast WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200; SELECT * from DiaObjectLast WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700; ... (with the caveat that HTM indices do not change after position updates, it should be ra/dec of the first version of DiaObject). One potentially interesting idea is to make pixelId a part of the diaObjectId (higher bits, so that contiguous location is preserved), then we could guarantee that pixelId is stable and cannot change. Dia(Forced)Source table There is some freedom for choosing which query is used to retrieve data from these tables - if the set of diaObjectIds is known then one can select based on that (and for relational databases this seems to work better than other approach), otherwise same region-based select as for DiaObjectLast should be used. Additionally we only need to select last several months (12) of sources from these tables. I think region-based selection should be preferred because it can run in parallel with DiaObjectLast select (if we have enough capacity). That would make partitioning and clustering to be based on htm similarly to DiaObjectLast. The difference is the additional time constraint. I don't think it would be possible to use timestamp in a clustering key without blowing up the number of queries (2-D space range selection is inherently non-contiguous). I could imagine something like this though which would result in a factor 10 more queries than for above case CREATE TABLE DiaSource ( partId int , // htm8 of ra, dec pixelId int , // htm20 of ra, dec month int , // year and month combined, e.g. 202010 for Oct 2020 diaSourceId int , ..., primary key (partId, pixelId, month , diaSourceId) );   // series of queries for one visit, just for two months SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300 AND month = 202010; SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 10000 AND pixelId < 12300 AND month = 202011; SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200 AND month = 202010; SELECT * from DiaSource WHERE partId = 1 AND pixelId >= 14000 AND pixelId < 15200 AND month = 202011; SELECT * from DiaSource WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700 AND month = 202010; SELECT * from DiaSource WHERE partId = 2 AND pixelId >= 21000 AND pixelId < 21700 AND month = 202011; ... Another option is to use "month" as a second part of partition key, Cassandra allows IN operator for the last column of partition key so we can do something like: CREATE TABLE DiaSource ( partId int , // htm8 of ra, dec month int , // year and month combined, e.g. 202010 for Oct 2020 pixelId int , // htm20 of ra, dec diaSourceId int , ..., primary key ((partId, month ), pixelId, diaSourceId) );   // series of queries for one visit, just for two months SELECT * from DiaSource WHERE partId = 1 AND month IN (202010, 202011, 202012) AND pixelId >= 10000 AND pixelId < 12300; SELECT * from DiaSource WHERE partId = 1 AND month IN (202010, 202011, 202012) AND pixelId >= 14000 AND pixelId < 15200; SELECT * from DiaSource WHERE partId = 2 AND month IN (202010, 202011, 202012) AND pixelId >= 21000 AND pixelId < 21700; ... I want to try latter variant to see if that actually works.
            Hide
            salnikov Andy Salnikov added a comment -

            I'm running prototype with the above partitioning scheme on my personal VM and it looks more or less stable, though there are occasional crashes due to timeouts. One example of this is:

            Traceback (most recent call last):
              File "/home/salnikov/DM-19536/l1dbproto/bin/ap_proto.py", line 28, in <module>
                rc = app.run()
              File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 268, in run
                self.visit(db, visit_id, dt, region, sources, indices)
              File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 438, in visit
                latest_objects = db.getDiaObjects(ranges)
              File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 338, in getDiaObjects
                rows = future.result()
              File "cassandra/cluster.py", line 4131, in cassandra.cluster.ResponseFuture.result
            cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1
            

            Another crash is due to batch size limit:

            2019-05-19 23:48:25,486 [INFO] dax.ppdb.ppdbCassandra: DiaObject: will store 37438 records
            2019-05-19 23:48:27,760 [WARNING] cassandra.protocol: Server warning: Batch for [PPDB.DiaObject] is of size 5540824, exceeding specified threshold of 5120000 by 420824.
            Traceback (most recent call last):
              File "/home/salnikov/DM-19536/l1dbproto/bin/ap_proto.py", line 28, in <module>
                rc = app.run()
              File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 268, in run
                self.visit(db, visit_id, dt, region, sources, indices)
              File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 475, in visit
                db.storeDiaObjects(objects, dt)
              File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 504, in storeDiaObjects
                self._storeObjectsAfw(objs, "DiaObject", extra_columns=extra_columns)
              File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 694, in _storeObjectsAfw
                self._session.execute(queries)
              File "cassandra/cluster.py", line 2217, in cassandra.cluster.Session.execute
              File "cassandra/cluster.py", line 4131, in cassandra.cluster.ResponseFuture.result
            cassandra.WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'consistency': 'LOCAL_ONE', 'required_responses': 1, 'received_responses': 0, 'write_type': 'BATCH'}
            

            Latter is probably mostly due to large number of records we store at once, with proper per-CCD processing that number should be much lower, but OTOH we'll have much wider records.

            Show
            salnikov Andy Salnikov added a comment - I'm running prototype with the above partitioning scheme on my personal VM and it looks more or less stable, though there are occasional crashes due to timeouts. One example of this is: Traceback (most recent call last): File "/home/salnikov/DM-19536/l1dbproto/bin/ap_proto.py", line 28, in <module> rc = app.run() File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 268, in run self.visit(db, visit_id, dt, region, sources, indices) File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 438, in visit latest_objects = db.getDiaObjects(ranges) File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 338, in getDiaObjects rows = future.result() File "cassandra/cluster.py", line 4131, in cassandra.cluster.ResponseFuture.result cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1 Another crash is due to batch size limit: 2019-05-19 23:48:25,486 [INFO] dax.ppdb.ppdbCassandra: DiaObject: will store 37438 records 2019-05-19 23:48:27,760 [WARNING] cassandra.protocol: Server warning: Batch for [PPDB.DiaObject] is of size 5540824, exceeding specified threshold of 5120000 by 420824. Traceback (most recent call last): File "/home/salnikov/DM-19536/l1dbproto/bin/ap_proto.py", line 28, in <module> rc = app.run() File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 268, in run self.visit(db, visit_id, dt, region, sources, indices) File "/home/salnikov/DM-19536/l1dbproto/python/lsst/l1dbproto/ap_proto.py", line 475, in visit db.storeDiaObjects(objects, dt) File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 504, in storeDiaObjects self._storeObjectsAfw(objs, "DiaObject", extra_columns=extra_columns) File "/home/salnikov/DM-19536/dax_ppdb/python/lsst/dax/ppdb/ppdbCassandra.py", line 694, in _storeObjectsAfw self._session.execute(queries) File "cassandra/cluster.py", line 2217, in cassandra.cluster.Session.execute File "cassandra/cluster.py", line 4131, in cassandra.cluster.ResponseFuture.result cassandra.WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'consistency': 'LOCAL_ONE', 'required_responses': 1, 'received_responses': 0, 'write_type': 'BATCH'} Latter is probably mostly due to large number of records we store at once, with proper per-CCD processing that number should be much lower, but OTOH we'll have much wider records.
            Hide
            salnikov Andy Salnikov added a comment -

            The prototype works "OK" on my desktop in a VM. Performance is not good, of course, I only have one shared spinning disk on my desktop so I don't expect anything stellar. I need more realistic environment (more than one server, optimally with SSD) to test it further, will open new ticket for this.

            API for Cassandra PPDB is somewhat different than SQL one, I am not ready yet to start messing with master (but PPDB API will have to change anyways) so I'll continue development and testing on my personal branch u/andy-slac/cassandra for both dax_ppdb and l1dbproto.

            And I want to attach some plots before I close this ticket, will keep it open for a little while.

            Show
            salnikov Andy Salnikov added a comment - The prototype works "OK" on my desktop in a VM. Performance is not good, of course, I only have one shared spinning disk on my desktop so I don't expect anything stellar. I need more realistic environment (more than one server, optimally with SSD) to test it further, will open new ticket for this. API for Cassandra PPDB is somewhat different than SQL one, I am not ready yet to start messing with master (but PPDB API will have to change anyways) so I'll continue development and testing on my personal branch u/andy-slac/cassandra for both dax_ppdb and l1dbproto . And I want to attach some plots before I close this ticket, will keep it open for a little while.
            Hide
            salnikov Andy Salnikov added a comment -

            Some plots as promised. I don't think we can derive much useful info from them but here they are anyways.

            Total per-visit time:

            Total select and total store times:

            Select time per table:

            Store time per table:

            On the last plot numbers do not add up because per-table times do not include query generation time and total includes that time. I wish there was a different and more efficient API to run bulk inserts.

            Show
            salnikov Andy Salnikov added a comment - Some plots as promised. I don't think we can derive much useful info from them but here they are anyways. Total per-visit time: Total select and total store times: Select time per table: Store time per table: On the last plot numbers do not add up because per-table times do not include query generation time and total includes that time. I wish there was a different and more efficient API to run bulk inserts.
            Hide
            salnikov Andy Salnikov added a comment -

            Self-reviewed and merged both dax__ppdb and l1dbproto, this is on branch u/andy-slac/cassandra, not on master. Closing the ticket, next step is to find more reasonable setup for further testing.

            Show
            salnikov Andy Salnikov added a comment - Self-reviewed and merged both dax__ppdb and l1dbproto , this is on branch u/andy-slac/cassandra , not on master. Closing the ticket, next step is to find more reasonable setup for further testing.

              People

              Assignee:
              salnikov Andy Salnikov
              Reporter:
              salnikov Andy Salnikov
              Watchers:
              Andy Salnikov, Colin Slater, Kian-Tat Lim
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved:

                  Jenkins

                  No builds found.