Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-16404

Implement multi-node support in ap_proto

    XMLWordPrintable

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: L1 Database
    • Labels:

      Description

      for more realistic tests I'd like to add support for running ap_proto on many hosts, maybe using MPI or some other sort of IPC (it should be a low-level data exchange so there is no critical point here).

        Attachments

          Issue Links

            Activity

            Hide
            salnikov Andy Salnikov added a comment -

            I think simplest approach is to use MPI for IPC, slurm at NCSA is already setup to run MPI tasks so that simplifies things a lot.

            With MPI we can run one task per "tile" (CCD), but there has to be one "master" process which coordinates visit processing. That master can actually be one of the tile processes (and I think usual MPI approach is to use ID=0 for coordination).

            Current fork-based implementation workflow:

            • there is a pre-generated map (coordinates) of variable sources, randomly placed so that there is 10k sources on average in FOV
            • when main process starts it reads all those sources into memory and runs a visit loop
            • for each visit:
              • generates random pointing direction and makes a circular region for FOV at that pointing
              • simulates DIA by selection variable sources from the circle region and adding some false positives from noise and transients (adds ~50% more sources), this makes a list of DIASources (~15k)
              • splits circle region into square tiles (e.g. 15x15 tiles, can be different number)
              • for each tile region:
                • forks and runs a visit processing algorithm using only DIASources which fall into a tile region:
                  • generates a set of HTM indices enveloping a tile region
                  • reads DIAObjects using those HTM indices 
                  • generates new DIAObjects from DIASources (this step should normally be done after reading Sources history from database but here it does not mater)
                  • does forced photometry for non-detected DIAObject, extends list of new DIAObject
                  • generates new DIASources and DIAForcedSources
                  • reads history of DIASources and DIAForcedSources from database (normally this has to be done earlier but we don't care)
                  • store new DIAObjects, DIASources, and DIAForcedSources in database
                  • forked process exits after processing single visit/tile
              • parent process waits until all forked children finish and moves to a next visit
              • (also if the is no tiling requested then the whole FOV region is processed in the same process without forking)
            Show
            salnikov Andy Salnikov added a comment - I think simplest approach is to use MPI for IPC, slurm at NCSA is already setup to run MPI tasks so that simplifies things a lot. With MPI we can run one task per "tile" (CCD), but there has to be one "master" process which coordinates visit processing. That master can actually be one of the tile processes (and I think usual MPI approach is to use ID=0 for coordination). Current fork-based implementation workflow: there is a pre-generated map (coordinates) of variable sources, randomly placed so that there is 10k sources on average in FOV when main process starts it reads all those sources into memory and runs a visit loop for each visit: generates random pointing direction and makes a circular region for FOV at that pointing simulates DIA by selection variable sources from the circle region and adding some false positives from noise and transients (adds ~50% more sources), this makes a list of DIASources (~15k) splits circle region into square tiles (e.g. 15x15 tiles, can be different number) for each tile region: forks and runs a visit processing algorithm using only DIASources which fall into a tile region: generates a set of HTM indices enveloping a tile region reads DIAObjects using those HTM indices  generates new DIAObjects from DIASources (this step should normally be done after reading Sources history from database but here it does not mater) does forced photometry for non-detected DIAObject, extends list of new DIAObject generates new DIASources and DIAForcedSources reads history of DIASources and DIAForcedSources from database (normally this has to be done earlier but we don't care) store new DIAObjects, DIASources, and DIAForcedSources in database forked process exits after processing single visit/tile parent process waits until all forked children finish and moves to a next visit (also if the is no tiling requested then the whole FOV region is processed in the same process without forking)
            Hide
            salnikov Andy Salnikov added a comment -

            How this workflow can be re-implemented using MPI? We are going to run 1 process per tile, with 1 process coordinating visit processing (generating pointing direction and maybe sources). Question is what each process responsibilities are. Potentially I can limit coordination to just generating pointing direction and splitting FOV into tiles, so that each tile process will need to make its own list of sources. Potential issue here is that DIA (which makes sources) is implemented to run on a circular region (FOV) and does not work with square tiles. I'm not sure I want to mess with that code yet to make it more generic (issue is a generation of a uniformly distributed random sources on sphere, and also FOV/tile intersection). It is probably easier to generate sources in "main" process as we do now and then distribute those sources to each tile process. The number of sources per tile is not too great (15k/~200 makes less than 100 sources) and can be easily moved between processes.

            With that idea the above workflow can be transformed into:

            • on startup "main" process reads variable sources map, others can skip it
            • for each visit
              • main process
                • generates random pointing direction and makes a circular region for FOV at that pointing
                • simulates DIA for the whole FOV, generating list of sources
                • splits circle region into square tiles
                • maps each source to its tile, makes per-tile lists of sources
                • "scatters" these per-tile lists to matching tile processes
              • all processes:
                • read their per-tile source list
                • some synchronization at this point is probably needed if we want to measure timing realistically
                • does visit processing based on that source list (exactly the same thing as above)
                • send something back to main process, and read the new list again
              • main process:
                • "gathers" info from all processes (that "something" in the previous point)
                • move to a next visit

            I also want to keep forking (and no-fork single-process) option as well, so the structure of the code may be a bit messy with all of those options.

            Show
            salnikov Andy Salnikov added a comment - How this workflow can be re-implemented using MPI? We are going to run 1 process per tile, with 1 process coordinating visit processing (generating pointing direction and maybe sources). Question is what each process responsibilities are. Potentially I can limit coordination to just generating pointing direction and splitting FOV into tiles, so that each tile process will need to make its own list of sources. Potential issue here is that DIA (which makes sources) is implemented to run on a circular region (FOV) and does not work with square tiles. I'm not sure I want to mess with that code yet to make it more generic (issue is a generation of a uniformly distributed random sources on sphere, and also FOV/tile intersection). It is probably easier to generate sources in "main" process as we do now and then distribute those sources to each tile process. The number of sources per tile is not too great (15k/~200 makes less than 100 sources) and can be easily moved between processes. With that idea the above workflow can be transformed into: on startup "main" process reads variable sources map, others can skip it for each visit main process generates random pointing direction and makes a circular region for FOV at that pointing simulates DIA for the whole FOV, generating list of sources splits circle region into square tiles maps each source to its tile, makes per-tile lists of sources "scatters" these per-tile lists to matching tile processes all processes: read their per-tile source list some synchronization at this point is probably needed if we want to measure timing realistically does visit processing based on that source list (exactly the same thing as above) send something back to main process, and read the new list again main process: "gathers" info from all processes (that "something" in the previous point) move to a next visit I also want to keep forking (and no-fork single-process) option as well, so the structure of the code may be a bit messy with all of those options.
            Hide
            salnikov Andy Salnikov added a comment - - edited

            Implementation of the multi-node ap_proto using MPI is ready and running now on verification cluster. The implementation is roughly as above with small exception that main process does not do per-tile source filtering, instead it sends full list of per-FOV sources to each tile process and that does filtering just as it was done before in fork mode. The reason for this is that filtering things in one process is too slow (N*M problem), it turns out that sending full list runs faster.

            With MPI we can benefit from keeping database connection alive, will need to quantify how fast things run with MPI compared to fork. Will post some plots here before closing the ticket.

            Show
            salnikov Andy Salnikov added a comment - - edited Implementation of the multi-node ap_proto using MPI is ready and running now on verification cluster. The implementation is roughly as above with small exception that main process does not do per-tile source filtering, instead it sends full list of per-FOV sources to each tile process and that does filtering just as it was done before in fork mode. The reason for this is that filtering things in one process is too slow (N*M problem), it turns out that sending full list runs faster. With MPI we can benefit from keeping database connection alive, will need to quantify how fast things run with MPI compared to fork. Will post some plots here before closing the ticket.
            Hide
            salnikov Andy Salnikov added a comment -

            Some plots from my tests with MPI.

            Here is total visit time as a function of visit number, there is a familiar bump at the beginning while "smart" Oracle learns how to properly query DiaObjectLast table:

            Stripping that bump one can see nice drop in time at 10k visits - this is when I switch connection caching, before it was using the same mode as forking (new connection for each visit). Gain from cached connections is about 2 seconds per visit:

            Here is the fit of the total visit time after enabling connection caching:

            At 30k visits time is around 10 sec, which is again ~2 sec better than what we saw before on a single node with forking.

            I think the conclusion here is that using more cores on client side does not help much by itself (i.e. bottleneck is on server side), but reusing/caching database connection on client side can save ~2 seconds per visit.

            Show
            salnikov Andy Salnikov added a comment - Some plots from my tests with MPI. Here is total visit time as a function of visit number, there is a familiar bump at the beginning while "smart" Oracle learns how to properly query DiaObjectLast table: Stripping that bump one can see nice drop in time at 10k visits - this is when I switch connection caching, before it was using the same mode as forking (new connection for each visit). Gain from cached connections is about 2 seconds per visit: Here is the fit of the total visit time after enabling connection caching: At 30k visits time is around 10 sec, which is again ~2 sec better than what we saw before on a single node with forking. I think the conclusion here is that using more cores on client side does not help much by itself (i.e. bottleneck is on server side), but reusing/caching database connection on client side can save ~2 seconds per visit.
            Hide
            salnikov Andy Salnikov added a comment -

            Self-reviewed and merged. MPI works sufficiently OK for these tests and we do not see a big gain adding more cores but there is a small improvement from keeping database connection open.

            Show
            salnikov Andy Salnikov added a comment - Self-reviewed and merged. MPI works sufficiently OK for these tests and we do not see a big gain adding more cores but there is a small improvement from keeping database connection open.

              People

              Assignee:
              salnikov Andy Salnikov
              Reporter:
              salnikov Andy Salnikov
              Watchers:
              Andy Salnikov, Vaikunth Thukral
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved:

                  CI Builds

                  No builds found.