Scalaz 7 zipWithİndex ile kaçınarak bellek sızıntısı/grup enumeratees

Arka plan

Olarak Scalaz 7 iteratees sürekli yığın alanı verileri (yani sınırsız) geniş bir akışı işlemek için kullanıyorum this question, kaydetti.

Benim kod bu gibi görünüyor:

type ErrorOrT[M[ _], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs    vs map { 
      case (c, i) => processChunk(c, i) 
  } &= (data.zipWithIndex mapE


Bir bellek sızıntısı içine çalıştırmak gibi görünüyor, ama hata benim kod Scalaz ya da olup olmadığını bilmek Scalaz/FP yeterince aşina değilim. Sezgisel olarak, bu kod sadece (sipariş üzerine) gerektirir bekliyorumPkat Chunkboyutlu uzay.

Not: OutOfMemoryError Bir karşılaşıldı, ama benim kod consume kullanarak a similar question buldum.


Bazı testler deneyin ve sorunu izole etmek için koştum. Özetlemek gerekirse, sızıntı sadece zipWithIndex group ikisi de kullanıldığında ortaya çıkacak gibi görünüyor.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Testler için kod:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c   a.length 

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c 

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c   v._1.length


  • Hata kodum mu?
  • Nasıl sabit yığın alanı içinde bu işi yapabilir miyim?

