# Fix a race condition in the Qserv Ingest worker

XMLWordPrintable

#### Details

• Type: Bug
• Status: Done
• Resolution: Done
• Fix Version/s: None
• Component/s:
• Labels:
None
• Epic Link:
• Sprint:
DB_S21_12
• Team:
Data Access and Database
• Urgent?:
No

# The bug

The current implementation of the Ingest service at Qserv workers has a loophole creating a possibility of leaving successfully ingested table contributions in a table after aborting the corresponding super-transaction. A typical scenario for seeing the problem is when an ingest workflow is aborting a transaction while still having unfinished ingest requests in the context of the same transaction. Typically this would happen in complex parallel workflows, especially if no good coordination between the transaction management and ingesting contributions is implemented by the workflow.

The root cause of the problem was found in the following sequence of actions taken by the ingest service on a contribution request made by a client:

1. Analyze the status of the transaction. If it's ABORTED report back to the client and exit.
2. Begin pulling the contribution from the object store via HTTP, reading from a locally mounted filesystem, or receiving the contribution's data sent by the client of the binary protocol.
3.  Process the contribution and store it in the temporary filesystem,
4. Create the MySQL partition corresponding to the transaction (that was validated earlier at step 1) if none existed.
5. Upload the contribution into the table.
6. Report SUCCESS back to a client.

The algorithm allows a race condition between steps 1 and 5 of the sequence.

## The solution

The proposed solution is to introduce an additional check after step 5 to ensure the transaction was not ABORTED since its status was checked at step 1. And if it was then the service will eliminate the corresponding MySQL partition.

The algorithm is based on the one-way approach to changing states of the "super-transactions" in the Replication/Ingest system:

• from STARTED to ABORTED, or
• from STARTED to FINISHED

## Notes

While it's perfectly clear what to do about the above-presented scenario (transactions being aborted during ingests), it's not clear how to treat transactions successfully committed while there were outstanding ingests. One option would be not to do anything about it and assume that the newly ingested contribution was legitimate. Another possibility would be to report an error code to the workflow (and optionally delete the MySQL partition). In general, there is no perfect solution to some ordering mistakes that could be made by the ingest workflows. Perhaps, adding an additional Q&A mechanism to the system would help to identify issues a-posteriori? The current implementation of the system has enough information in its persistent state that could be used to discover abnormalities during the catalog ingest after it's over. The findings could be used for the manual or semi-automated post-ingest fixes if needed.

#### Activity

Hide
Igor Gaponenko added a comment -
Show
Igor Gaponenko added a comment - PR: https://github.com/lsst/qserv/pull/614
Hide
Igor Gaponenko added a comment - - edited

# Testing

The new code has been tested in the "large" cluster at NCSA. During the test the table gaia_source (similar to LSST's Object) of the catalog GAIA DR2 was being ingested into Qserv using 480 parallel super_transactions. The catalog's size is about 1 TB. There are 313684 individual file contributions into the 146320 chunk and the corresponding overlap tables. A few rounds of tests have been made. In the first round, the catalog was loaded w/o interruptions. During the next two rounds, multiple ongoing transactions were aborted during ingest to test the behavior of the work ingest services. The tests have been proven to be successful. In particular, the worker's Ingest services were sent back (to the ingest workflow) errors on unexpected transaction state changes detected by the new code. For example:

 transaction 15769 changed state to ABORTED while the input file was being prepared for the ingest. transaction id=15769 is not active transaction 15774 got aborted while the file was being ingested into the table. transaction id=15774 is not active transaction 15771 is not active transaction id=15771 is not active transaction 15772 is not active ... 

After the database was published, the number of rows in the table gaia_source was verified against a report for the so-called "transaction contributions" recorded in the persistent state of the Replication/Ingest system. Apparently, the numbers match.
Here is what was reported by Qserv:

 % mysql --protocol=tcp -hlocalhost -P4040 -uqsmaster \  -e 'SELECT COUNT(*) FROM test101.gaia_source' +-----------+ | COUNT(*) | +-----------+ | 572256457 | +-----------+ 

And here is the report from the contributions:

 transactions.total: 1440  .success: 145 gaia_source  contribs.total.chunks: 468627  .overlaps: 472411  .success.chunks: 52757  .overlaps: 52970  rows.total.chunks: 5076013582  .overlaps: 2095177889  .success.chunks: 572256457  .overlaps: 236658130 

NOTE: the number of the successfully ingested rows is found in rows.success.chunks: 572256457. The number matches the one reported by Qserv.

The REST request that was used to obtain the contributions:

 curl 'http://localhost:25081/ingest/trans?database=test101&contrib=1&contrib_long=1' \  -o test101_contrib.json 

See the documentation on the REST service at https://confluence.lsstcorp.org/display/DM/Ingest%3A+11.1.3.3.+Get+info+on+transactions

And here is the Python code for analyzing the data and making the report:

 import json import sys   database="test101" num_transactions = 0 num_transactions_success = 0 num_contribs = {} num_contribs_success = {} num_rows = {} num_rows_success = {} d = json.load(open("{}_contrib.json".format(database))) for t in d['databases'][database]['transactions']:  num_transactions = num_transactions + 1  finished = t['state'] == 'FINISHED'  if finished:  num_transactions_success = num_transactions_success + 1  for f in t['contrib']['files']:  table = f['table']  if table not in num_contribs:  num_contribs[table] = {'chunk':0, 'overlap':0}  num_rows[table] = {'chunk':0, 'overlap':0}  num_contribs_success[table] = {'chunk':0, 'overlap':0}  num_rows_success[table] = {'chunk':0, 'overlap':0}    overlap = f['overlap'] != 0  if overlap: num_contribs[table]['overlap'] = num_contribs[table]['overlap'] + 1  else: num_contribs[table]['chunk'] = num_contribs[table]['chunk'] + 1  if overlap: num_rows[table]['overlap'] = num_rows[table]['overlap'] + f['num_rows']  else: num_rows[table]['chunk'] = num_rows[table]['chunk'] + f['num_rows']  if finished and (['success'] != 0):  if overlap: num_contribs_success[table]['overlap'] = num_contribs_success[table]['overlap'] + 1  else: num_contribs_success[table]['chunk'] = num_contribs_success[table]['chunk'] + 1  if overlap: num_rows_success[table]['overlap'] = num_rows_success[table]['overlap'] + f['num_rows']  else: num_rows_success[table]['chunk'] = num_rows_success[table]['chunk'] + f['num_rows'] print "transactions.total: ", num_transactions print " .success:", num_transactions_success for table in num_contribs.keys():  print table  print " contribs.total.chunks: ", num_contribs[table]['chunk']  print " .overlaps:", num_contribs[table]['overlap']   print " .success.chunks: ", num_contribs_success[table]['chunk']  print " .overlaps:", num_contribs_success[table]['overlap']  print " rows.total.chunks: ", num_rows[table]['chunk']  print " .overlaps:", num_rows[table]['overlap']  print " .success.chunks: ", num_rows_success[table]['chunk']  print " .overlaps:", num_rows_success[table]['overlap'] 

Show
Igor Gaponenko added a comment - - edited Testing The new code has been tested in the "large" cluster at NCSA. During the test the table gaia_source (similar to LSST's Object ) of the catalog GAIA DR2 was being ingested into Qserv using 480 parallel super_transactions . The catalog's size is about 1 TB . There are 313684 individual file contributions into the 146320 chunk and the corresponding overlap tables. A few rounds of tests have been made. In the first round, the catalog was loaded w/o interruptions. During the next two rounds, multiple ongoing transactions were aborted during ingest to test the behavior of the work ingest services. The tests have been proven to be successful. In particular, the worker's Ingest services were sent back (to the ingest workflow) errors on unexpected transaction state changes detected by the new code. For example: transaction 15769 changed state to ABORTED while the input file was being prepared for the ingest. transaction id=15769 is not active transaction 15774 got aborted while the file was being ingested into the table. transaction id=15774 is not active transaction 15771 is not active transaction id=15771 is not active transaction 15772 is not active ... After the database was published, the number of rows in the table gaia_source was verified against a report for the so-called "transaction contributions" recorded in the persistent state of the Replication/Ingest system. Apparently, the numbers match. Here is what was reported by Qserv: % mysql --protocol=tcp -hlocalhost -P4040 -uqsmaster \ -e 'SELECT COUNT(*) FROM test101.gaia_source' +-----------+ | COUNT(*) | +-----------+ | 572256457 | +-----------+ And here is the report from the contributions: transactions.total: 1440 .success: 145 gaia_source contribs.total.chunks: 468627 .overlaps: 472411 .success.chunks: 52757 .overlaps: 52970 rows.total.chunks: 5076013582 .overlaps: 2095177889 .success.chunks: 572256457 .overlaps: 236658130 NOTE : the number of the successfully ingested rows is found in rows.success.chunks: 572256457 . The number matches the one reported by Qserv. The REST request that was used to obtain the contributions: curl 'http://localhost:25081/ingest/trans?database=test101&contrib=1&contrib_long=1' \ -o test101_contrib.json See the documentation on the REST service at https://confluence.lsstcorp.org/display/DM/Ingest%3A+11.1.3.3.+Get+info+on+transactions And here is the Python code for analyzing the data and making the report: import json import sys   database = "test101" num_transactions = 0 num_transactions_success = 0 num_contribs = {} num_contribs_success = {} num_rows = {} num_rows_success = {} d = json.load( open ( "{}_contrib.json" . format (database))) for t in d[ 'databases' ][database][ 'transactions' ]: num_transactions = num_transactions + 1 finished = t[ 'state' ] = = 'FINISHED' if finished: num_transactions_success = num_transactions_success + 1 for f in t[ 'contrib' ][ 'files' ]: table = f[ 'table' ] if table not in num_contribs: num_contribs[table] = { 'chunk' : 0 , 'overlap' : 0 } num_rows[table] = { 'chunk' : 0 , 'overlap' : 0 } num_contribs_success[table] = { 'chunk' : 0 , 'overlap' : 0 } num_rows_success[table] = { 'chunk' : 0 , 'overlap' : 0 }   overlap = f[ 'overlap' ] ! = 0 if overlap: num_contribs[table][ 'overlap' ] = num_contribs[table][ 'overlap' ] + 1 else : num_contribs[table][ 'chunk' ] = num_contribs[table][ 'chunk' ] + 1 if overlap: num_rows[table][ 'overlap' ] = num_rows[table][ 'overlap' ] + f[ 'num_rows' ] else : num_rows[table][ 'chunk' ] = num_rows[table][ 'chunk' ] + f[ 'num_rows' ] if finished and ([ 'success' ] ! = 0 ): if overlap: num_contribs_success[table][ 'overlap' ] = num_contribs_success[table][ 'overlap' ] + 1 else : num_contribs_success[table][ 'chunk' ] = num_contribs_success[table][ 'chunk' ] + 1 if overlap: num_rows_success[table][ 'overlap' ] = num_rows_success[table][ 'overlap' ] + f[ 'num_rows' ] else : num_rows_success[table][ 'chunk' ] = num_rows_success[table][ 'chunk' ] + f[ 'num_rows' ] print "transactions.total: " , num_transactions print " .success:" , num_transactions_success for table in num_contribs.keys(): print table print " contribs.total.chunks: " , num_contribs[table][ 'chunk' ] print " .overlaps:" , num_contribs[table][ 'overlap' ] print " .success.chunks: " , num_contribs_success[table][ 'chunk' ] print " .overlaps:" , num_contribs_success[table][ 'overlap' ] print " rows.total.chunks: " , num_rows[table][ 'chunk' ] print " .overlaps:" , num_rows[table][ 'overlap' ] print " .success.chunks: " , num_rows_success[table][ 'chunk' ] print " .overlaps:" , num_rows_success[table][ 'overlap' ]
Hide
Andy Salnikov added a comment -

Code looks good. I'm still not sure that logic is completely fail-proof. What would happen if between steps 4 and 5 other client requests to delete transaction/partition, will it cause something bad at step 5? I suspect some sort of locking on transaction state may be needed to handle concurrency correctly.

Show
Andy Salnikov added a comment - Code looks good. I'm still not sure that logic is completely fail-proof. What would happen if between steps 4 and 5 other client requests to delete transaction/partition, will it cause something bad at step 5? I suspect some sort of locking on transaction state may be needed to handle concurrency correctly.
Hide
Igor Gaponenko added a comment - - edited

This is a very good question! I think we should be good here for as long as the table still remains partitioned even if the partition was gone. In this case, step 5 will simply fail, and the corresponding error code will be sent back to a client. The details on this can be found in the MySQL/MariaDB documentation.

Where I might be really concerned about would be a scenario when the table gets transformed into the non-partitioned one in between 4 and 5. In this case, the delayed (wanted or unwanted, depending on how the transaction ended up) contribution would be succesfully loaded into the monolithic (non-partitioned) table. The only possibility when this could happen is if the catalog was published. One of the actions taken by the ingest system during the catalog publishing is to transform all tables into the non-partitioned form. And before publishing a catalog the system would ensure that all transactions were committed or aborted (committing or aborting transactions is still a responsibility of the ingest workflows, not the ingest system). Unfortunately, there is still a chance that some incorrectly implemented ingest workflow would succeed to finish all transactions and publish a catalog while having "run-away" loading requests in between steps 4 and 5. In practice, the probability of running into the above-presented scenario is extremely low. Unfortunately, it's not zero. Though, I think I know how to address this problem. The current implementation of the system already has (and uses) a mechanism of the transient (not the MySQL ones) process-level locks on the DDL operations over tables (there is a dedicated mutex for each table). These operations include things like: creating/deleting tables, adding/removing partitions, or transforming tables from the partitioned form into the monolithic one. I can use this mechanism to group the following operations over the same table into a sequence shown below:

 - acquire the transient lock for the table (RAII) - check if the transaction is still open (if not open then report an error and quit) - create a MySQL partition (if it still doesn't exist) - load data into the table - check again if the transaction wasn't aborted (if aborted then remove partition, report, and error and quit) 

This should work because all operations over the table are performed by various threads within the same process, and the locking mechanism has the process-level scope. So, the sequence (once the lock is acquired) will prevent other threads from making any modifications to the partitioning status of the table.

While what's presented above would work, I'm still a bit concerned that this coupling of the DDM and the data loading operations may affect the overall performance of the ingest system ... for parallel ingest into the same table. To which extend this may affect the performance requires further investigation. I suggest that we do this potential investigation/improvement as a separate effort in a separate ticket. The good news is that the ingest system records enough information in its internal persistent bookkeeping state (including transaction contributions, and the timing of the critical operations) to detect abnormalities of this kind, locate and eliminate extraneous data from the tables. This could be used as a workaround before a better solution (like the one presented above) will be implemented.

UPDATED: The following ticket DM-29882 has been registered to investigate/address the issue.

Show
Igor Gaponenko added a comment - - edited This is a very good question! I think we should be good here for as long as the table still remains partitioned even if the partition was gone. In this case, step 5 will simply fail, and the corresponding error code will be sent back to a client. The details on this can be found in the MySQL/MariaDB documentation. Where I might be really concerned about would be a scenario when the table gets transformed into the non-partitioned one in between 4 and 5. In this case, the delayed (wanted or unwanted, depending on how the transaction ended up) contribution would be succesfully loaded into the monolithic (non-partitioned) table. The only possibility when this could happen is if the catalog was published. One of the actions taken by the ingest system during the catalog publishing is to transform all tables into the non-partitioned form. And before publishing a catalog the system would ensure that all transactions were committed or aborted (committing or aborting transactions is still a responsibility of the ingest workflows, not the ingest system). Unfortunately, there is still a chance that some incorrectly implemented ingest workflow would succeed to finish all transactions and publish a catalog while having "run-away" loading requests in between steps 4 and 5. In practice, the probability of running into the above-presented scenario is extremely low. Unfortunately, it's not zero. Though, I think I know how to address this problem. The current implementation of the system already has (and uses) a mechanism of the transient (not the MySQL ones) process-level locks on the DDL operations over tables (there is a dedicated mutex for each table). These operations include things like: creating/deleting tables, adding/removing partitions, or transforming tables from the partitioned form into the monolithic one. I can use this mechanism to group the following operations over the same table into a sequence shown below: - acquire the transient lock for the table (RAII) - check if the transaction is still open (if not open then report an error and quit) - create a MySQL partition (if it still doesn't exist) - load data into the table - check again if the transaction wasn't aborted (if aborted then remove partition, report, and error and quit) This should work because all operations over the table are performed by various threads within the same process, and the locking mechanism has the process-level scope. So, the sequence (once the lock is acquired) will prevent other threads from making any modifications to the partitioning status of the table. While what's presented above would work, I'm still a bit concerned that this coupling of the DDM and the data loading operations may affect the overall performance of the ingest system ... for parallel ingest into the same table. To which extend this may affect the performance requires further investigation. I suggest that we do this potential investigation/improvement as a separate effort in a separate ticket. The good news is that the ingest system records enough information in its internal persistent bookkeeping state (including transaction contributions , and the timing of the critical operations) to detect abnormalities of this kind, locate and eliminate extraneous data from the tables. This could be used as a workaround before a better solution (like the one presented above) will be implemented. UPDATED : The following ticket DM-29882 has been registered to investigate/address the issue.

#### People

Assignee:
Igor Gaponenko
Reporter:
Igor Gaponenko
Reviewers:
Andy Salnikov
Watchers:
Andy Salnikov, Fabrice Jammes, Fritz Mueller, Hsin-Fang Chiang, Igor Gaponenko
Votes:
0 Vote for this issue
Watchers:
5 Start watching this issue

#### Dates

Created:
Updated:
Resolved: