Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-2077

W16 Add Support for Multi-table Shared Scans

    Details

    • Type: Epic
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: Qserv
    • Labels:
      None
    • Epic Name:
      W16 Multi-table shared scans
    • Story Points:
      113
    • WBS:
      02C.06.02.03
    • Team:
      Data Access and Database
    • Cycle:
      Winter 2016

      Description

      Implement multi-table shared scans. Ensure that shared-scans are not delaying interactive queries. The baseline architecture of the shares scans are described in LDM-135 Shared Scanning.

        Attachments

          Issue Links

            Activity

            Hide
            jbecla Jacek Becla added a comment -

            Comment from Daniel (transferred from DM-2096):

            Worker parallelism isn't a big deal at this point. The parallelism right now is matched to the schedulers managing I/O, and only somewhat related to core count. In this context, not a lot of tuning and management is needed. I think that once the shared scan scheduler can reasonably handle concurrent table scans, then all we need are config parameters (command-line, or altered dynamically) to tune. Is this an epic? I would keep the work related to this in the same epic as the shared scan improvements.

            Show
            jbecla Jacek Becla added a comment - Comment from Daniel (transferred from DM-2096 ): Worker parallelism isn't a big deal at this point. The parallelism right now is matched to the schedulers managing I/O, and only somewhat related to core count. In this context, not a lot of tuning and management is needed. I think that once the shared scan scheduler can reasonably handle concurrent table scans, then all we need are config parameters (command-line, or altered dynamically) to tune. Is this an epic? I would keep the work related to this in the same epic as the shared scan improvements.
            Hide
            abh Andy Hanushevsky added a comment -

            The following is a straw man proposal for implementing a revised shared scan. The proposal seeks simplicity. I also point out some optimizations and, consequently, more complicated solutions. Finally, I end with some open questions. Anyway, I call this a single cursor shared scan (SCSS).

            An SCSS is implemented as a shared scan manager object, say class SSManager. It does not actually run queries but is used to sequence queries for optimal memory and I/O utilization. The SCSS manager is initialized with the following pieces of information: a) std::vector<std:string> of chunk file paths, b) maximum number of queries to run at once, and c) transfer rate (default is 50MB/Sec).

            Once initialized, it can be used to control the running of queries across all of the registered chunks. Each query is represented by a query object, say class SSQuery, that has a single callback method SSQuery::Run(std::string chunkname, bool lastchunk). This callback is used to tell the query creator to run the query on a particular chunk. The name of the chunk is passed and if the query is running on the last chunk, “lastchunk” is set to true.

            When qserv wants to run a shared scan query, it creates an SSQuery object and calls SSManager::AddQuery(SSQuery *). This method adds the query to the queue of queries to be run. There are actually two logical queues: a) the active queue containing all currently running queries; and b) the pending queue which contains all queries that need to run. Assume that variable k is the current cursor position (i.e. the index into the vector of chunk file paths). The execution sequence is as follows:

            AddQuery() simply places the SSQuery at the end of the pending queue. Pending queries are considered for inclusion only when the cursor advances. The cursor advances when all of the queries at position k complete. When this occurs, the memory occupied by chunk k is unlocked. Then the file for chunk k+1 is memory mapped, locked in memory and optionally pre-population is started. All SSQuery objects that have run on the last chunk are removed from the active queue. The cursor is advanced. If the number of active queries is less than the maximum allowed, sufficient queries are moved from the pending queue to the active queue to reach the maximum. Then SSQuery::Run(std::string chunkname, bool lastchunk) is called for each query in the active queue.

            That’s pretty much it. So, yes, it is very simple. Here are optimizations and open questions:

            Optimization: Chunk piggy-backing.
            When AddQuery() is called, the new query is not added to the currently running active set even if the active set is not full. An optimization would be to add the query to the active set. However, if this is done we need to make sure that we do not unduly delay the current set with the new query. We can do this (sort of) by seeing if there is sufficient time remaining to fully process the new query. We can do this by estimating the percentage of the expected deadline has gone by. The estimator can be the transfer rate. So, the deadline is t+(size_of_chunk/transfer_rate). If less than 20% (arbitrary) of the time has elapsed then we can add the query, otherwise the query must wait for the cursor to advance. It is not clear how much this actually buys us since the deadline is rather crude and there is no good way to do a good approximation without knowing the actual query speed.

            Optimization: Fast Forwarding or Dual Cursor Shared Scan (DCSS)
            The SCSS suffers from the problem that the fastest query is no faster than the slowest active query. This is because we do not advance the cursor until all of the current queries are done. This problem can be mitigated to some extend by allowing faster queries to race ahead of the slower queries. Here we implement two cursors, k and k’. If we find that 50% or more queries have completed, those queries are assigned to cursor k’ which is one ahead of k and is allowed to independently advance. Thus, slow queries have a much smaller impact on fast queries. The side-effects are significant as this requires more memory to be locked and may substantially increase disk I/O. The benefits are somewhat dubious as it is not clear that a query executes at the same rate for all chunks. If it does not, then this optimization is not particularly good (see the next optimization).

            Optimization: Query Bagging
            When a query is added it could be tagged as “slow”, “average”, or “fast”. The idea here is to add queries to the active set that have roughly the same execution speed. So, when running a set of “slow” queries, only “slow” queries would be added to the active set if at all possible. Needless to say, the scheduling is rather complex because we need to prevent query starvation. Depending on how the scheduling is actually implemented, we could get into a situation where less than the maximum number of queries would run from time to time as we transition from one bag to another. This can be mitigated by simply having three independent query managers (one for each type of query) and would be much simpler to implement though it would triple memory usage and disk I/O. Query bagging is better than the DCSS optimization but requires that queries be accurately categorized.

            Optimization: Non-sequential Ordering
            The simplicity of the SCSS is founded on the premise that we always process chunks in a sequential order. This really makes for uncomplicated code, but suffers from turbulent execution speed (or let’s says it is chunky). If we allow queries to enter and leave the active set in arbitrary order we can, over time, smooth out the flow. However, that means we need to keep track of all the chunks the query has touched and all the chunks that still remain (e.g. using a bit vector). It is not clear this actually buys us enough to warrant the complication.

            Optimization: Double Buffering
            This optimization is rather simple and usually quite effective. When all the queries for chunk k are started, the manager prepares chunk k+1. The idea is to make sure the pages of chunk k+1 are in memory before any query actually starts running on that chunk (ideally anyway). This of course increases memory usage. While there is no additional disk I/O, the disk I/O that there is may interfere with the current running queries. Double buffering becomes more intrusive as we add cursors and is rather complicated when using non-sequential ordering.

            Below are some of the open questions.

            Question: How does the manager know that a query completed?
            There are two obvious approaches here. We can simply say that a query completes when SSQuery::Run() returns. This means that the callback needs to be executed on a new thread. Furthermore, it makes a great deal of assumptions on how mySQL queries are actually run. Alternatively, if we assume that SSQuery::Run() does a simple setup of the query and spawns a new thread if need be. In other words, it does very little actual processing and a return means nothing other than the query has been dispatched. This would require a new method, SSManager::QueryDone(SSQuery *), to be added and must be called when the query completes. Whether or not this is automatic when the last chunk is dispatched is an open question.

            Question: How would one abort a query?
            The simplest approach is to add an SSManager::RemoveQuery(SSQuery *) method. The idea here is that one should be able to remove a query at any time if, for example, the query failed on a particular chunk and there is no reason to continue. If query completion notification is done by a return from SSQuery::Run() then the call would need to return a value indicating whether one should proceed to the next chunk or remove the query from all queues.

            Question: Can queries be suspended?
            There is no reason why a query could not be suspended. All this means that it is moved to a suspend queue via a SSManager::SuspendQuery(SSQuery *) call and placed back to the top of the pending queue when SSManager::ResumeQuery(SSQuery *) is called. Suspension takes place when the cursor is advanced. In the sequential model, a resumed query may need to wait for the cursor to go fully around before it actually resumes. Whether or not this is a useful feature is questionable.

            Question: What about multiple table scans?
            This is a very sticky question. It seems to be spawned by the possibility that a query needs access to multiple tables which reside in different chunks. As far as I can see, the concurrency requirements are arbitrary, solely dependent on the query. In such a circumstance, if we require that all necessary chunks be in memory before the query starts then it essentially leads us to a bin-packing problem (i.e. NP-hard). So, I don’t have a good solution for this one. Perhaps I misunderstand what is really needed.

            Comments?

            Show
            abh Andy Hanushevsky added a comment - The following is a straw man proposal for implementing a revised shared scan. The proposal seeks simplicity. I also point out some optimizations and, consequently, more complicated solutions. Finally, I end with some open questions. Anyway, I call this a single cursor shared scan (SCSS). An SCSS is implemented as a shared scan manager object, say class SSManager. It does not actually run queries but is used to sequence queries for optimal memory and I/O utilization. The SCSS manager is initialized with the following pieces of information: a) std::vector<std:string> of chunk file paths, b) maximum number of queries to run at once, and c) transfer rate (default is 50MB/Sec). Once initialized, it can be used to control the running of queries across all of the registered chunks. Each query is represented by a query object, say class SSQuery, that has a single callback method SSQuery::Run(std::string chunkname, bool lastchunk). This callback is used to tell the query creator to run the query on a particular chunk. The name of the chunk is passed and if the query is running on the last chunk, “lastchunk” is set to true. When qserv wants to run a shared scan query, it creates an SSQuery object and calls SSManager::AddQuery(SSQuery *). This method adds the query to the queue of queries to be run. There are actually two logical queues: a) the active queue containing all currently running queries; and b) the pending queue which contains all queries that need to run. Assume that variable k is the current cursor position (i.e. the index into the vector of chunk file paths). The execution sequence is as follows: AddQuery() simply places the SSQuery at the end of the pending queue. Pending queries are considered for inclusion only when the cursor advances. The cursor advances when all of the queries at position k complete. When this occurs, the memory occupied by chunk k is unlocked. Then the file for chunk k+1 is memory mapped, locked in memory and optionally pre-population is started. All SSQuery objects that have run on the last chunk are removed from the active queue. The cursor is advanced. If the number of active queries is less than the maximum allowed, sufficient queries are moved from the pending queue to the active queue to reach the maximum. Then SSQuery::Run(std::string chunkname, bool lastchunk) is called for each query in the active queue. That’s pretty much it. So, yes, it is very simple. Here are optimizations and open questions: Optimization: Chunk piggy-backing. When AddQuery() is called, the new query is not added to the currently running active set even if the active set is not full. An optimization would be to add the query to the active set. However, if this is done we need to make sure that we do not unduly delay the current set with the new query. We can do this (sort of) by seeing if there is sufficient time remaining to fully process the new query. We can do this by estimating the percentage of the expected deadline has gone by. The estimator can be the transfer rate. So, the deadline is t+(size_of_chunk/transfer_rate). If less than 20% (arbitrary) of the time has elapsed then we can add the query, otherwise the query must wait for the cursor to advance. It is not clear how much this actually buys us since the deadline is rather crude and there is no good way to do a good approximation without knowing the actual query speed. Optimization: Fast Forwarding or Dual Cursor Shared Scan (DCSS) The SCSS suffers from the problem that the fastest query is no faster than the slowest active query. This is because we do not advance the cursor until all of the current queries are done. This problem can be mitigated to some extend by allowing faster queries to race ahead of the slower queries. Here we implement two cursors, k and k’. If we find that 50% or more queries have completed, those queries are assigned to cursor k’ which is one ahead of k and is allowed to independently advance. Thus, slow queries have a much smaller impact on fast queries. The side-effects are significant as this requires more memory to be locked and may substantially increase disk I/O. The benefits are somewhat dubious as it is not clear that a query executes at the same rate for all chunks. If it does not, then this optimization is not particularly good (see the next optimization). Optimization: Query Bagging When a query is added it could be tagged as “slow”, “average”, or “fast”. The idea here is to add queries to the active set that have roughly the same execution speed. So, when running a set of “slow” queries, only “slow” queries would be added to the active set if at all possible. Needless to say, the scheduling is rather complex because we need to prevent query starvation. Depending on how the scheduling is actually implemented, we could get into a situation where less than the maximum number of queries would run from time to time as we transition from one bag to another. This can be mitigated by simply having three independent query managers (one for each type of query) and would be much simpler to implement though it would triple memory usage and disk I/O. Query bagging is better than the DCSS optimization but requires that queries be accurately categorized. Optimization: Non-sequential Ordering The simplicity of the SCSS is founded on the premise that we always process chunks in a sequential order. This really makes for uncomplicated code, but suffers from turbulent execution speed (or let’s says it is chunky). If we allow queries to enter and leave the active set in arbitrary order we can, over time, smooth out the flow. However, that means we need to keep track of all the chunks the query has touched and all the chunks that still remain (e.g. using a bit vector). It is not clear this actually buys us enough to warrant the complication. Optimization: Double Buffering This optimization is rather simple and usually quite effective. When all the queries for chunk k are started, the manager prepares chunk k+1. The idea is to make sure the pages of chunk k+1 are in memory before any query actually starts running on that chunk (ideally anyway). This of course increases memory usage. While there is no additional disk I/O, the disk I/O that there is may interfere with the current running queries. Double buffering becomes more intrusive as we add cursors and is rather complicated when using non-sequential ordering. Below are some of the open questions. Question: How does the manager know that a query completed? There are two obvious approaches here. We can simply say that a query completes when SSQuery::Run() returns. This means that the callback needs to be executed on a new thread. Furthermore, it makes a great deal of assumptions on how mySQL queries are actually run. Alternatively, if we assume that SSQuery::Run() does a simple setup of the query and spawns a new thread if need be. In other words, it does very little actual processing and a return means nothing other than the query has been dispatched. This would require a new method, SSManager::QueryDone(SSQuery *), to be added and must be called when the query completes. Whether or not this is automatic when the last chunk is dispatched is an open question. Question: How would one abort a query? The simplest approach is to add an SSManager::RemoveQuery(SSQuery *) method. The idea here is that one should be able to remove a query at any time if, for example, the query failed on a particular chunk and there is no reason to continue. If query completion notification is done by a return from SSQuery::Run() then the call would need to return a value indicating whether one should proceed to the next chunk or remove the query from all queues. Question: Can queries be suspended? There is no reason why a query could not be suspended. All this means that it is moved to a suspend queue via a SSManager::SuspendQuery(SSQuery *) call and placed back to the top of the pending queue when SSManager::ResumeQuery(SSQuery *) is called. Suspension takes place when the cursor is advanced. In the sequential model, a resumed query may need to wait for the cursor to go fully around before it actually resumes. Whether or not this is a useful feature is questionable. Question: What about multiple table scans? This is a very sticky question. It seems to be spawned by the possibility that a query needs access to multiple tables which reside in different chunks. As far as I can see, the concurrency requirements are arbitrary, solely dependent on the query. In such a circumstance, if we require that all necessary chunks be in memory before the query starts then it essentially leads us to a bin-packing problem (i.e. NP-hard). So, I don’t have a good solution for this one. Perhaps I misunderstand what is really needed. Comments?
            Hide
            jbecla Jacek Becla added a comment -

            Andy, I propose to put this narrative in a dedicated confluence page, as it is easier to keep track of modifications there (and we can still discuss things through comments). So I am planning to move your text to confluence, unless someone objects (soon).

            Show
            jbecla Jacek Becla added a comment - Andy, I propose to put this narrative in a dedicated confluence page, as it is easier to keep track of modifications there (and we can still discuss things through comments). So I am planning to move your text to confluence, unless someone objects (soon).
            Hide
            abh Andy Hanushevsky added a comment -

            Hi Jacek,

            I have no complaints on that score. Thouigh, I suppose, it's another
            communication method to keep track of

            Andy

            Show
            abh Andy Hanushevsky added a comment - Hi Jacek, I have no complaints on that score. Thouigh, I suppose, it's another communication method to keep track of Andy
            Hide
            jbecla Jacek Becla added a comment -

            you can watch confluence pages and receive emails on every update/comment, it is not too bad!

            Show
            jbecla Jacek Becla added a comment - you can watch confluence pages and receive emails on every update/comment, it is not too bad!
            Hide
            jbecla Jacek Becla added a comment -

            I copied Andy's narrative to Confluence, see Shared Scans, let's fine-tune it the design and have a discussion there.

            Show
            jbecla Jacek Becla added a comment - I copied Andy's narrative to Confluence, see Shared Scans , let's fine-tune it the design and have a discussion there.

              People

              • Assignee:
                jgates John Gates
                Reporter:
                fritzm Fritz Mueller
                Watchers:
                Andy Hanushevsky, Fritz Mueller, Jacek Becla
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel