Memory sharing in multiprocessing
@danschef: No need to answer, I am just logging these here as future enhancements. I hope that is OK.
As far as I understand, global variables can only be shared between multiprocessing processes when using multiprocessing.set_start_method('fork')
(which is the default method now, but that will change in Python 3.14: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods).
A different way to share the memory would be to back the cached arrays in the GeoArray
s using multiprocessing.shared_memory
, there is a numpy example at https://docs.python.org/3/library/multiprocessing.shared_memory.html.
This could potentially also be an interesting extension of the GeoArray
class itself.
Here's an example that works locally for me
from multiprocessing.managers import SharedMemoryManager
with SharedMemoryManager() as SMM:
def create_shared_mem_ndarray(input_array):
shm = SMM.SharedMemory( size=input_array.nbytes)
shared_buffer_array = np.frombuffer(shm.buf, dtype=input_array.dtype)
shared_buffer_array = shared_buffer_array[:input_array.size].reshape(input_array.shape)
shared_buffer_array[:] = input_array[:]
return shared_buffer_array, shm
self.ref.arr, self.ref_mem = create_shared_mem_ndarray(self.ref.arr)
self.shift.arr, self.shift_mem = create_shared_mem_ndarray(self.shift.arr)
# run co-registration for whole grid
if self.CPUs is None or self.CPUs > 1:
if not self.q:
cpus = self.CPUs if self.CPUs is not None else multiprocessing.cpu_count()
print("Calculating tie point grid (%s points) using %s CPU cores..." % (len(GDF), cpus))
And then no globals are required, shift
and ref
and their shared memory handles can be just passed into the processes using _get_coreg_kwargs
.
def _get_coreg_kwargs(self, pID, wp):
return dict(
ref=self.ref,
shift=self.shift,
ref_mem=self.ref_mem,
shift_mem=self.shift_mem,
@staticmethod
def _get_spatial_shifts(coreg_kwargs):
separate_args = ['ref', 'shift', 'pointID', 'fftw_works', 'ref_mem', 'shift_mem']
CR = COREG(coreg_kwargs['ref'], coreg_kwargs['shift'], CPUs=1, **{k:v for k,v in coreg_kwargs.items() if k not in separate_args})
CR.fftw_works = coreg_kwargs['fftw_works']
CR.calculate_spatial_shifts()
# fetch results
last_err = CR.tracked_errors[-1] if CR.tracked_errors else None
win_sz_y, win_sz_x = CR.matchBox.imDimsYX if CR.matchBox else (None, None)
CR_res = [win_sz_x, win_sz_y, CR.x_shift_px, CR.y_shift_px, CR.x_shift_map, CR.y_shift_map,
CR.vec_length_map, CR.vec_angle_deg, CR.ssim_orig, CR.ssim_deshifted, CR.ssim_improved,
CR.shift_reliability, last_err]
del CR
coreg_kwargs['ref_mem'].close()
coreg_kwargs['shift_mem'].close()
return [coreg_kwargs['pointID']] + CR_res
Note that I am also passing a handle to the shared memory instance into the process, so I can call .close()
on it. The shared memory manager takes care of calling .unlink()
at the end of the multiprocessing block.
Not doing so causes warnings like this one:
unraisableexception.py:80: PytestUnraisableExceptionWarning: Exception ignored in: <function SharedMemory.__del__ at 0x12eb2b5b0>
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniconda/base/envs/arosics_test/lib/python3.10/multiprocessing/shared_memory.py", line 187, in __del__
self.close()
File "/opt/homebrew/Caskroom/miniconda/base/envs/arosics_test/lib/python3.10/multiprocessing/shared_memory.py", line 230, in close
self._mmap.close()
BufferError: cannot close exported pointers exist
warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))