Generator in Groovy

同期する Generator を java.util.concurrent パッケージを使って書いてみる。

Using BlockingQueue

はじめに http://groovy.codehaus.org/Iterator+Tricks のように非同期のものを BlockingQueue で書いてみる。

def generate(Closure generator) {
  return new Iterable() {
    @Override Iterator iterator() {
      def queue = new java.util.concurrent.LinkedBlockingQueue(1)
      def done  = false
      def yield = { queue.put(it) }
      def worker = new Thread({
        try {
          generator(yield)
        } catch (InterruptedException ignore) {
        } finally {
          done = true
        }
      })
 
      worker.daemon = true
      worker.start()
      return new Iterator() {
        @Override void finalize() {
          worker.interrupt()
        }
 
        @Override boolean hasNext() {
          return !done || !queue.empty
        }
 
        @Override def next() {
          if (hasNext()) {
            try {
              return queue.take()
            } catch (InterruptedException ignore) {}
          }
          throw new NoSuchElementException()
        }
 
        @Override void remove() {
          throw new UnsupportedOperationException()
        }
      }
    }
  }
}
def fibo(yield) {
  def a = 0
  def b = 1
  while (true) {
    yield(b)
    (a, b) = [b, a+b]
  }
}

assert generate(this.&fibo).find{ it % 20 == 0 } == 832040

フィボナッチ数を求めるような場合は問題ないが PythonJavaScript の yield と同じようには扱えない。

なぜ同じように扱えないのか?

問題点その1 iterator() を呼び出した時点で worker thread が起動する

def counter = generate { yield ->
  for (i = 0; ; i++) {
    println "put $i"
    yield(i)
  }
}.iterator()
// iterator() を呼び出した時点で put している
// put 0
// put 1


問題点その2 put が先行する

def counter = generate { yield ->
  for (i = 0; ; i++) {
    println "put $i"
    yield(i)
  }
}.iterator()
5.times {
  println "get ${counter.next()}"
}
// get の回数より put の回数が多い
// put 0
// get 0
// put 1
// put 2
// put 3
// get 1
// get 2
// get 3
// put 4
// put 5
// put 6
// get 4


問題点その3 BlockingQueue に null を渡せない

for (x in generate{ it(null) }) println x
// Exception in thread "Thread-2" java.lang.NullPointerException

Using CountDownLatch

問題点1を解決する。
CountDownLatch は合図があるまでスレッドを待機させることができるのでこれを使う。
Worker Thread をスレッド起動直後に待機させ、最初の hasNext() 呼び出しで合図を送って開始させる。

def generate(Closure generator) {
  return new Iterable() {
    @Override Iterator iterator() {
      def queue  = new java.util.concurrent.LinkedBlockingQueue(1)
      def signal = new java.util.concurrent.CountDownLatch(1)
      def start  = false
      def done   = false
      def yield  = { queue.put(it) }
      def worker = new Thread({
        try {
          signal.await()
          generator(yield)
        } catch (InterruptedException ignore) {
        } finally {
          done = true
        }
      })
  
      worker.daemon = true
      worker.start()
      return new Iterator() {
        @Override void finalize() {
          worker.interrupt()
        }
  
        @Override boolean hasNext() {
          if (!start) {
            start = true
            signal.countDown()
          }
          return !queue.empty || !done
        }
  
        @Override def next() {
          if (hasNext()) {
            try {
              return queue.take()
            } catch (InterruptedException ignore) {}
          }
          throw new NoSuchElementException()
        }
  
        @Override void remove() {
          throw new UnsupportedOperationException()
        }
      }
    }
  }
}

Using SynchronousQueue & Semaphore

問題点2を解決する。
BlockingQueue の実装クラスには SynchronousQueue という常に空のキューがある。
常に空でキューなのかということだが put したと同時に take するので常に空ということらしい。
これで解決したかと思ったのだけども LinkedBlockingQueue で1個置き場所があって最大2個ずれていたので置き場所がなくなっても1個ずれる。
結局、Semaphore を使ってスレッドが同時に2つ動作しないように管理しないといけない。
Semaphore は開始時の制御にも使えるので CountDownLatch はもう使用しない。
Worker Thread の開始直後と、put 直後は Worker Thread を一度止めて、次の hasNext が呼ばれるまで待機するようにする。

def generate(Closure generator) {
  return new Iterable() {
    @Override Iterator iterator() {
      def queue = new java.util.concurrent.SynchronousQueue()
      def lock  = new java.util.concurrent.Semaphore(0)
      def value = []
      def DONE  = new Object()
      def yield = {
        queue.put(it)
        lock.acquire()
      }
      def worker = new Thread({
        try {
          lock.acquire()
          generator(yield)
        } catch (InterruptedException ignore) {
        } finally {
          queue.put(DONE)
        }
      })
    
      worker.daemon = true
      worker.start()
      return new Iterator() {
        @Override void finalize() {
          worker.interrupt()
        }
    
        @Override boolean hasNext() {
          if (value.empty) {
            try {
              lock.release()
              value[0] = queue.take()
            } catch (InterruptedException e) {
              value[0] = DONE
            }
          }
          return value[0] != DONE
        }
  
        @Override def next() {
          if (hasNext()) return value.remove(0)
          throw new NoSuchElementException()
        }
    
        @Override void remove() {
          throw new UnsupportedOperationException()
        }
      }
    }
  }
}

Using Two Semaphores

問題点3を解決する。
そもそも BlockingQueue には null を渡すことができない。
変わりに LinkedList を使い BlockingQueue が行っていた同期処理をもうひとつの Semaphore で行う。

def generate(Closure generator) {
  return new Iterable() {
    @Override Iterator iterator() {
      def queue   = new LinkedList()  // size == 0 or 1
      def getLock = new java.util.concurrent.Semaphore(0)
      def putLock = new java.util.concurrent.Semaphore(0)
      def done    = false
      def yield = {
        queue.addLast(it)
        getLock.release()
        putLock.acquire()
      }
      def worker = new Thread({
        try {
          putLock.acquire()
          generator(yield)
        } catch (InterruptedException ignore) {
        } finally {
          done = true
          getLock.release()
        }
      })
    
      worker.daemon = true
      worker.start()
      return new Iterator() {
        @Override void finalize() {
          worker.interrupt()
        }
    
        @Override boolean hasNext() {
          if (!done && queue.empty) {
            try {
              putLock.release()
              getLock.acquire()
            } catch (InterruptedException e) {
              worker.interrupt()
              return false
            }
          }
          return !done
        }
    
        @Override def next() {
          if (hasNext()) return queue.removeFirst()
          throw new NoSuchElementException()
        }
    
        @Override void remove() {
          throw new UnsupportedOperationException()
        }
      }
    }
  }
}

同期しながら動作し、null を渡すことも可能

def counter = generate { yield ->
  for (i = 0; ; i++) {
    println "put $i"
    yield(i)
  }
}.iterator()
5.times {
  println "get ${counter.next()}"
}
// put と get は1対1で同期している
// put 0
// get 0
// put 1
// get 1
// put 2
// get 2
// put 3
// get 3
// put 4
// get 4

assert generate{ it(null) }).iterator().next() == null

使ってみる

def counter = generate { yield ->
  (1..5).each{ yield(it) }
}

for (i in counter) println i

// counter は Iterable なので再利用可能
for (i in counter) println i

// リスト内包表記のようにも書ける
println generate{ for (x in 1..3) for (y in 1..3) it(x*y) }.iterator().take(10).collect()

// この場合は iterator の方が都合がよいのでメソッドを定義する
def gen(Closure generator) {
  generate(generator).iterator()
}

println gen{ for (x in 1..3) for (y in 1..3) it(x*y) }.take(10).collect()

// ただし、これは Ruby の tap のようなものがあれば同じように書けてしまう
def tap(obj, Closure doto) {
  obj.with(doto)
  return obj
}

println tap([]){ for (x in 1..3) for (y in 1..3) it << x*y }

終わりに

Generator を実装するのと concurrent パッケージを使用するという両方の目的で書いてみた。