BlockSorting を Groovy で

今年も半分が終わろうとしているが1月の TODO を解決する。

Suffix Array ではなく BWT の方を。

Groovy で実装してみる

参考


Ruby 版をそのまま Groovy へ

def encode(src) {
  int n = src.size()
  def srcsrc = src + src
  def vars = (0..<n).collect{ srcsrc[it..<it+n] }
  def code = vars.sort(true).collect{ it[-1] }
  return [code.join(), vars.indexOf(src)]
}

def decode(code, int index) {
  int n = code.size()
  def pairs  = [code.toList(), 0..<n].transpose()
  // stable sort
  def sorted = pairs.sort{ a, b -> a[0] <=> b[0] ?: a[1] <=> b[1] }
  def result = []
  n.times{
    result << sorted[index][0]
    index   = sorted[index][1]
  }
  return result.join()
}

assert encode("abracadabra") == ["rdarcaaaabb", 2]
assert decode("rdarcaaaabb", 2) == "abracadabra"

BWT とは?

Burrows-Wheeler Transform の略でブロックソートとも呼ばれているみたい。
圧縮前に行う変換処理で BWT 自体は圧縮しないが、圧縮しやすいように並べ替えてくれる。
先程実装したものを使って確認してみる。

assert encode("abcabc") == ["ccaabb", 0]
assert encode("abccba") == ["bacacb", 1] 

1つ目のようにパターンに同じようなパターンが表れると変換後連続するようになる。
2つ目のような場合はばらばらのまま。
不思議なことにこの変換後の文字列と数値から元の文字列へ戻すことができる。

どうして decode できるのか?

Wikipedia の説明が分かりやすいのそっちを読んだ方がいいかも


ここでは、例としてよくない気がするが "abcde" を変換する様子を見ていく。

巡回行列をつくる。

def matrix = [
  "abcde",
  "eabcd",  // 1行目を rotate 1
  "deabc",  // 2行目を rotate 1
  "cdeab",  // 3行目を rotate 1
  "bcdea"   // 4行目を rotate 1
]


第1列でソートする。
第1列でソートしたとき最終列は e,a,b,c,d である。
これが encode の戻り値でもう1つの戻り値の数値は元の文字列の行インデックスを返している。

assert matrix.sort{ it[0] } == [
  "abcde",
  "bcdea",
  "cdeab",
  "deabc",
  "eabcd"
]

assert encode("abcde") == ["eabcd", 0]


他の列でもソートしてみる。
巡回しているのでソートした列の1つ前の列は e,a,b,c,d である。

// 第2列でソートしたとき第1列は e,a,b,c,d である
assert matrix.sort{ it[1] } == [
  "eabcd",
  "abcde",
  "bcdea",
  "cdeab",
  "deabc",
]
// 第3列でソートしたとき第2列は e,a,b,c,d である
// 第4列でソートしたとき第3列は e,a,b,c,d である
// 第5列でソートしたとき第4列は e,a,b,c,d である


この性質を使って、最終列の文字列から巡回行列をソートした状態をつくることができる。

// 1. 第1列を e,a,b,c,d にし、第2列をソートされた文字列にする
matrix = [
  "ea...",
  "ab...",
  "bc...",
  "cd...",
  "de...",
]

// 2. 第1列でソートする。最終列は、e,a,b,c,d になるはず
matrix = [
  "ab..e",
  "bc..a",
  "cd..b",
  "de..c",
  "ea..d",
]

// 3. 全行 rotate 1 すれば 1. に戻る

巡回行列をソートした状態の何番目に元の文字列があるかはわからないのでインデックスを使う。

ソートを1回だけにする

先程の工程をそのまま実装すると n 回のソートが必要になる

def process1(input) {
  def n = input.size()
  def output = [[]] * n
  n.times {
    output = [input, output].transpose().sort{ it[0] }.collect{ h, t -> [h, *t] }
  }
  return output
}


同じ文字列をソートしているので、すべてのソートは同じ変換になる。
巡回しているのでそれ以降が既にソートされていることも重要である。
このためソートは常に安定ソートになる。
ソートで行う行の入れ替えを transform に覚えておけばソートは1回で済む。

def process2(input) {
  def n = input.size()
  def output = [[]] * n
  def transform = [input, 0..<n].transpose().sort{
    a, b -> a[0] <=> b[0] ?: a[1] <=> b[1]
  }.transpose()[1]
  n.times {
    output = [input, output].transpose()
    output = (0..<n).collect{ output[transform[it]] }.collect{ h, t -> [h, *t] }
  }
  return output
}


最終的な行番号が分かっていれば、その行だけ変換するようにできる。

// trace は対象行だけの処理を行う
def process3(input) {
  def n = input.size()
  def transform = [input, 0..<n].transpose().sort{
    a, b -> a[0] <=> b[0] ?: a[1] <=> b[1]
  }.transpose()[1]
  def trace = { i ->
    def result = []
    n.times{
      i = transform[i]
      result << input[i]
    }
    return result
  }
  return (0..<n).collect{ trace(it) }
}

assert process1("rdarcaaaabb" as List) == process2("rdarcaaaabb" as List)
assert process1("rdarcaaaabb" as List) == process3("rdarcaaaabb" as List)

decode 時には行番号が分かるのでその行だけ追いかければよい。
これで最初に実装した処理の意味が分かった。

行番号の代わりに終端文字を使う実装

文字列の最後に終端を示す文字を加えておけば、復元した行列から最後が終端文字の文字列が元の文字列であると分かる。
ただし、終端を示す文字は元の文字列に含まれていないこと。


英語版の Wikipedia に載っているものを Groovy で実装してみる。

SEP = '\0'

def bwt(s) {
  assert !s.contains(SEP)
  s += SEP
  def table = (0..<s.size()).collect{ s[it..-1] + s[0..<it] }.sort(true)
  return table.collect{ it[-1] }.join()
}

def ibwt(r) {
  def table = [""] * r.size()
  r.size().times {
    table = (0..<r.size()).collect{ r[it] + table[it] }.sort()
  }
  return table.find{ it[-1] == SEP }.replaceFirst(~/.$/, "")
}

assert "cacao" == ibwt(bwt("cacao"))

参考