# Deal with large pickles

#### Details

#### Description

Lauren MacArthur is running:

 coaddDriver.py /tigress/HSC/HSC --rerun lauren/LSST/DM-6816/cosmos --job DM-6816-cosmos-y-coaddDriver --time 100 --cores 96 --batch-type=slurm --mpiexec='-bind-to socket' --id tract=0 filter=HSC-Y --selectId ccd=0..103 filter=HSC-Y visit=274..302:2^306..334:2^342..370:2^1858..1862:2^1868..1882:2^11718..11742:2^22602..22608:2^22626..22632:2^22642..22648:2^22658..22664:2 --batch-submit '--mem-per-cpu 8000' 

and it is producing:

 OverflowError on tiger-r8c1n12:19889 in map: integer 2155421250 does not fit in 'int' Traceback (most recent call last):  File "/tigress/HSC/LSST/stack_20160915/Linux64/ctrl_pool/12.1+5/python/lsst/ctrl/pool/pool.py", line 99, in wrapper  return func(*args, **kwargs)  File "/tigress/HSC/LSST/stack_20160915/Linux64/ctrl_pool/12.1+5/python/lsst/ctrl/pool/pool.py", line 218, in wrapper  return func(*args, **kwargs)  File "/tigress/HSC/LSST/stack_20160915/Linux64/ctrl_pool/12.1+5/python/lsst/ctrl/pool/pool.py", line 554, in map  self.comm.scatter(initial, root=self.rank)  File "MPI/Comm.pyx", line 1286, in mpi4py.MPI.Comm.scatter (src/mpi4py.MPI.c:109079)  File "MPI/msgpickle.pxi", line 707, in mpi4py.MPI.PyMPI_scatter (src/mpi4py.MPI.c:48114)  File "MPI/msgpickle.pxi", line 168, in mpi4py.MPI.Pickle.dumpv (src/mpi4py.MPI.c:41672)  File "MPI/msgbuffer.pxi", line 35, in mpi4py.MPI.downcast (src/mpi4py.MPI.c:29070) OverflowError: integer 2155421250 does not fit in 'int' application called MPI_Abort(MPI_COMM_WORLD, 1) - process 0 

We need to fix or work around this problem.

#### Activity

Paul Price added a comment -

This appears to be a fundamental limitation in mpi4py and python:

 $python -c 'import cPickle as pickle; print len(pickle.dumps("X"*2**31, -1))' Traceback (most recent call last):  File "", line 1, in  SystemError: error return without exception set  I can reproduce the error with some simple code:  import mpi4py.MPI as mpi import cPickle as pickle   def main():  comm = mpi.COMM_WORLD  rank = comm.Get_rank()  print rank, "OK"  if rank == 0:  size = int(2**31 - 1)  string = "X"*size  print "MASTER:", len(pickle.dumps(string, -1))  else:  string = None  string = comm.bcast(string, root=0)  print rank, len(string)   if __name__ == "__main__":  main()  Running this under MPI:  $ mpiexec -n 2 python test.py  0 OK 1 OK MASTER: 2147483657 Traceback (most recent call last):  File "test.py", line 18, in   main()  File "test.py", line 14, in main  string = comm.bcast(string, root=0)  File "MPI/Comm.pyx", line 1276, in mpi4py.MPI.Comm.bcast (src/mpi4py.MPI.c:108819)  File "MPI/msgpickle.pxi", line 612, in mpi4py.MPI.PyMPI_bcast (src/mpi4py.MPI.c:47005)  File "MPI/msgpickle.pxi", line 119, in mpi4py.MPI.Pickle.dump (src/mpi4py.MPI.c:40840)  File "MPI/msgbuffer.pxi", line 35, in mpi4py.MPI.downcast (src/mpi4py.MPI.c:29070) OverflowError: integer 2147483657 does not fit in 'int' ^C[mpiexec@tiger-sumire] Sending Ctrl-C to processes as requested [mpiexec@tiger-sumire] Press Ctrl-C again to force abort 

I'm going to see if I can work around this and/or reduce the amount of data that's being transferred.

Paul Price added a comment - - edited

I can't work around this at the MPI level because pickle apparently is limited to 32-bit integers in python 2 (this is fixed in python 3, with protocol 4; PEP 3154). I tried sending the pickle by chunks smaller than 2**31 bytes, but python simply can't unpickle something whose size doesn't fit into an int, so unless we come up with our own 64-bit serialisation format we can't fix things that way. I'm going to have to go the route of reducing the amount of data that's being transferred.

John Swinbank added a comment -

Fritz Mueller — following up on our recent discussion, note that this is the most urgent issue we're currently looking at in the (broadly defined) Task Framework. Paul is the expert on this code.

Paul Price added a comment -

I have a workaround that seems to work on Lauren's test case. I split the:

 self.comm.broadcast((tags, func, args, kwargs, context), root=self.root) 

to broadcast the tuple elements individually, and I changed the distribution of initial jobs:

 self.comm.scatter(initial, root=self.rank) 

to distribute the jobs directly with send rather than scatter. I'll clean this up and then submit it for review tomorrow. In the mean time, I've put the dirty version on the ticket branch in case Lauren MacArthur needs to make progress tonight.

Lauren MacArthur added a comment -

Thanks, Paul. I won't be rerunning until tomorrow, so I'll try with whatever state the branch is in then.

Paul Price added a comment -

Fritz Mueller, since you're interested, would you be willing to review this fix?

 price@price-laptop:~/LSST/ctrl/pool (tickets/DM-8021=) $git sub-patch commit 9df40a36febd5bbcfd3842356097aaa3ba79860f Author: Paul Price  Date: Tue Oct 18 20:19:00 2016 -0400    Comm: add alternate version of scatter    The default version apparently pickles the entire 'dataList', which  can cause errors if the pickle size grows over 2^31 bytes due to  fundamental problems with pickle in python 2 [1][2] (causing, e.g.,  "OverflowError: integer 2155421250 does not fit in 'int'"). Instead,  we send the data to each slave node in turn; this reduces the pickle  size.    [1] http://bugs.python.org/issue11564  [2] https://www.python.org/dev/peps/pep-3154/   diff --git a/python/lsst/ctrl/pool/pool.py b/python/lsst/ctrl/pool/pool.py index 59e1277..ce3d663 100644 --- a/python/lsst/ctrl/pool/pool.py +++ b/python/lsst/ctrl/pool/pool.py @@ -300,6 +300,30 @@ class Comm(mpi.Intracomm):  with PickleHolder(value):  return super(Comm, self).bcast(value, root=root)   + def scatter(self, dataList, root=0, tag=0): + """Scatter data across the nodes + + The default version apparently pickles the entire 'dataList', + which can cause errors if the pickle size grows over 2^31 bytes + due to fundamental problems with pickle in python 2. Instead, + we send the data to each slave node in turn; this reduces the + pickle size. + + @param dataList List of data to distribute; one per node + (including root) + @param root Index of root node + @param tag Message tag (integer) + @return Data for this node + """ + if self.Get_rank() == root: + for rank, data in enumerate(dataList): + if rank == root: + continue + self.send(data, rank, tag=tag) + return dataList[root] + else: + return self.recv(source=root, tag=tag) +  def Free(self):  if self._barrierComm is not None:  self._barrierComm.Free() ` Show Paul Price added a comment - Fritz Mueller , since you're interested, would you be willing to review this fix? price@price-laptop:~/LSST/ctrl/pool (tickets/DM-8021=)$ git sub-patch commit 9df40a36febd5bbcfd3842356097aaa3ba79860f Author: Paul Price <price@astro.princeton.edu> Date: Tue Oct 18 20:19:00 2016 -0400   Comm: add alternate version of scatter The default version apparently pickles the entire 'dataList', which can cause errors if the pickle size grows over 2^31 bytes due to fundamental problems with pickle in python 2 [1][2] (causing, e.g., "OverflowError: integer 2155421250 does not fit in 'int'"). Instead, we send the data to each slave node in turn; this reduces the pickle size. [1] http://bugs.python.org/issue11564 [2] https://www.python.org/dev/peps/pep-3154/   diff --git a/python/lsst/ctrl/pool/pool.py b/python/lsst/ctrl/pool/pool.py index 59e1277..ce3d663 100644 --- a/python/lsst/ctrl/pool/pool.py +++ b/python/lsst/ctrl/pool/pool.py @@ -300,6 +300,30 @@ class Comm(mpi.Intracomm): with PickleHolder(value): return super(Comm, self).bcast(value, root=root) + def scatter(self, dataList, root=0, tag=0): + """Scatter data across the nodes + + The default version apparently pickles the entire 'dataList', + which can cause errors if the pickle size grows over 2^31 bytes + due to fundamental problems with pickle in python 2. Instead, + we send the data to each slave node in turn; this reduces the + pickle size. + + @param dataList List of data to distribute; one per node + (including root) + @param root Index of root node + @param tag Message tag (integer) + @return Data for this node + """ + if self.Get_rank() == root: + for rank, data in enumerate(dataList): + if rank == root: + continue + self.send(data, rank, tag=tag) + return dataList[root] + else: + return self.recv(source=root, tag=tag) + def Free(self): if self._barrierComm is not None: self._barrierComm.Free()
Lauren MacArthur added a comment -

I can confirm that running with the current ticket branch solve the problem. Thanks Paul!

Paul Price added a comment -

Fritz Mueller indicated that Nate Pease [X] might be a better reviewer.

Nate, would you mind taking a look at this, please?

Nate Pease [X] (Inactive) added a comment -

it looks good to me.

Paul Price added a comment -

Thanks Nate!

Merged to master.

