코틀린을 다루는 기술 8장을 읽고 정리한 내용입니다.
요약된 표현이 많기에 보다 자세한 설명은 책을 통해 확인하실 수 있습니다.

이번 장의 목표

  • 6장에서 구현한 함수 중 일부 함수를 더욱 효율적으로 구현하는 기법을 알아본다.
  • 리스트 연산을 자동으로 병렬화해 최근 컴퓨터에 탑재되는 멀티 코어 아키텍처의 장점을 살리는 법을 살펴본다.

이 장에서 다루는 내용

  • 메모리로 리스트 처리 속도 높이기
  • List와 Result 합성하기
  • 인덱스로 리스트 접근 구현하기
  • 리스트 펼치기
  • 자동으로 리스트 병렬 처리하기

앞 장에서 구현한 코드를 기반으로 내용이 유기적으로 이어지고 있다.

8.1 length의 단점

foldLeft 함수는 리스트의 모든 원소를 연속적으로 합성한다. 이로인해 전체 실행 시간은 리스트의 길이와 비례한다. 이 연산을 더 빠르게, 아니면 더 빨라 보이게 만드는 방법이 있을까?

 

리스트 원소 타입이 중요하지 않은 연산의 예시

  • 리스트의 해시 코드 : 모든 리스트 원소의 해시 코드를 모두 더해 계산한다.
  • toString이 반환하는 리스트의 문자열 : 모든 리스트 원소의 toString을 합성해 계산한다.
  • 원소 타입의 특성에 의존하되 구체적인 타입에 의존하지 않을 경우 (Conparable, Comparator, Summable 등)

8.2 성능 문제

  • 접기 연산의 비효율성 : 모든 원소를 하나하나 센다는 것
  • 개선 방안 : 원소 개수를 어딘가에 저장해두고, 리스트에 원소를 추가할 때마다 1을 더하면 되지 않을까? (ex. length)
    • 이때 메모화된 값을 저장하는 위치 : 리스트 자체

8.3 메모화의 이점

  • 리스트 내부에 원소 개수를 유지하기 위해 필요한 비용 vs 저장하지 않을 때 발생하는 비효율적인 연산 비용
  • 만약 length를 2번 이상 호출하면 시간적 손해가 확실하게 상쇄된다.

8.3.1 메모화의 단점 처리하기

메모화의 단점

  1. 원소 삽입에 드는 시간이 약간 늘어난다. 하지만 O(n) 시간이 걸리는 함수를 O(1)의 상수 시간으로 만들 수 있다는 점을 고려해보면, 처리 시간 측면에서는 따르는 이익이 더욱 크다.
  2. 메모리 사용량이 증가한다. 제자리에서 상태를 변이하는 데이터 구조에서는 이런 문제가 없다. 하지만 불변 리스트는 원소마다 길이를 메모화 해야한다. 5장에서 구현한 단일 연결 리스트는 length 정보를 추가하면 데이터 구조의 크기가 30% 정도(!) 늘어난다.

데이터 구조에 메모화 적용 여부는 1) 함수를 사용할 때 결과 객체를 매번 다시 만들지 않아도 되는지 2) 함수가 호출되는 빈도 등을 고려해 결정해야 한다. 5장에서 만들었던 단일 연결 리스트에 메모화된 length 함수를 작성해보자.

  • Nil (데이터가 없음을 나타내는 클래스)에서는 0을 반환

      override fun lengthMemorized() = 0
  • Cons에서는 클래스에 메모화를 위한 필드를 추가

      // 메모화를 위한 프로퍼티를 추가
      private val length: Int = tail.lengthMenorized + 1
    
      // ...
      override fun lengthMemorized() = length

8.3.2 성능 향상 평가하기

  • 백만개 기준) 메모리의 경우 : 가용 메모리 약 20~25MB 감소
  • 백만개 기준) 처리 시간의 경우 : 10번 호출 시 200 밀리초 소요 → 메모화 적용 후 거의 0초에 가깝게 소요

8.4 List와 Result의 합성

8.4.1 Result를 반환하는 리스트 처리하기

1) List의 Nil 요소에 접근해서 예외가 발생하지 않도록, 2) 실패나 결과가 없을 때 사용할 기본 값을 제공해 값에 안전하게 접근할 수 있도록, List의 head를 읽을 때 Result를 반환할 수 있다. List에서 Result를 반환하는 함수를 추가해보자.

// Nil에서의 구현
override fun headSafe(): Result<Nothing> = Result()

// Cons에서의 구현
override fun headSafe(): Result<A> = Result(head)

// 마지막 원소를 반환하기
fun lastSafe() = foldLeft(Result()) { _: Result<A> -> { y: A -> Result(y) } }

8.4.2 List를 Result로 변환하기

나중에 내용 보충~

계산 결과로 List 타입을 반환받으려면, List<Result> 타입의 값을 List 타입으로 바꾸는 방법이 필요하다. flatMap과의 차이점은 List와 Result라는 두 가지 다른 데이터 타입이 연관되어 있다는 점이다.

  • 실패, 빈 결과를 모두 버리고 성공한 결과만 모아 U 타입을 원소로 하는 리스트를 만든다
    • 모든 결과가 선택적일 때 결과의 리스트를 만든다 : 성공이 하나도 없으면 빈 리스트를 반환
    • 결과가 성골하려면 하나 이상 성공적인 결과가 필요하다 : 성공이 하나도 없으면 실패를 반환
  • 모든 결괏값이 필요하다 : 전체 연산이 성공해 모든 원소가 성공인지 결정한다. 모두 성공이면 Success<List<U>>를, 하나 이상 실패나 빈 결과가 있으면 Failure<List<U>>를 반환

8.5 리스트에 대한 일반적인 추상화

List 타입을 사용하는 여러 경우를 추상화하면 같은 코드를 재사용할 수 있다.

8.5.1 리스트를 묶거나(zip) 풀기(unzip)

  • 묶기 : 묶기는 두 리스트에서 같은 인덱스에 위치한 원소를 하나로 묶는 함수
  • 풀기 : 리스트의 원소를 분해해 두개의 리스트를 만드는 함수

8.5.2 인덱스로 원소 접근하기

단일 연결 리스트는 원소에 인덱스로 접근하기에 적절한 자료구조는 아니지만, 때로는 인덱스를 통한 접근이 필요할 때가 있다. Stack Overflow 발생을 막기 위해 공재귀를 사용하는 것이 좋다.

fun getAt(index: Int): Result<A> {
        val p: (Pair<Result<A>, Int>) -> Boolean = { it.second < 0 }
        return Pair<Result<A>, Int>(Result.failure("Index out of bound"), index).let { identity ->
            if (index < 0 || index >= length())
                identity
            else
                foldLeft(identity, p) { ta: Pair<Result<A>, Int> ->
                    { a: A ->
                        if (p(ta))
                            ta
                        else
                            Pair(Result(a), ta.second - 1)
                    }
                }

        }.first
    }

정해진 인덱스에 도달하면 (= 결과를 찾으면) 연산을 즉시 종료할 필요성이 있다.

8.5.3 리스트 나누기

특정 위치에서 리스트를 둘로 나눠야 할 때가 있다. (이후에 소개할 병렬 처리에서 이런 연산이 필요하다)

fun splitAt(index: Int): Pair<List<A>, List<A>> {
        tailrec fun splitAt(acc: List<A>,
                            list: List<A>, i: Int): Pair<List<A>, List<A>> =
            when (list) {
                Nil -> Pair(list.reverse(), acc)
                is Cons ->
                    if (i == 0)
                        Pair(list.reverse(), acc)
                    else
                        splitAt(acc.cons(list.head), list.tail, i - 1)
            }
        return when {
            index < 0        -> splitAt(0)
            index > length() -> splitAt(length())
            else             -> splitAt(Nil, this.reverse(), this.length() - index)
        }
    }

8.5.4 부분 리스트 검색하기

어떤 리스트가 다른 리스트 안에 포함되어 있는지 찾아내는 연산을 구현한다.

    // 어떤 리스트가 다른 리스트의 시작 부분에 위치하는지 검사
    fun startsWith(sub: List<@UnsafeVariance A>): Boolean {
        tailrec fun startsWith(list: List<A>, sub: List<A>): Boolean =
                when (sub) {
                    Nil  -> true
                    is Cons -> when (list) {
                        Nil  -> false
                        is Cons ->
                            if (list.head == sub.head)
                                startsWith(list.tail, sub.tail)
                            else
                                false
                    }
                }
        return startsWith(this, sub)
    }

    fun hasSubList(sub: List<@UnsafeVariance A>): Boolean {
        tailrec
        fun <A> hasSubList(list: List<A>, sub: List<A>): Boolean =
                when (list) {
                    Nil -> sub.isEmpty()
                    is Cons ->
                        if (list.startsWith(sub))
                            true
                        else
                            hasSubList(list.tail, sub)
                }
        return hasSubList(this, sub)
    }

8.6 리스트를 자동으로 병렬 처리하기

접기(fold)는 리스트의 원소 개수만큼 주어진 연산을 반복 적용한다. 요즘 대부분의 컴퓨터에는 멀티 코어 프로세서가 들어 있으므로 컴퓨터가 리스트를 병렬로 처리하게 할 수 있다. 접기 연산을 병렬화 하기 위해서는 모든 병렬 계산 결과를 하나로 조합할 수 있는 연산이 필요하다.

8.6.1 모든 계산을 병렬화할 수 없다

  • 병렬화 가능 : 리스트 원소의 합계 구하기
  • 병렬화 불가능 : 리스트 원소의 평균 구하기

8.6.2 리스트를 부분 리스트로 나누기

먼저 리스트를 부분 리스트로 나눈다. 리스트를 나누는 개수는 프로세서 수가 가장 중요한 요인이다. 이때 모든 부분 리스트 계산에 소요 시간이 동일하기 않을 수 있기 때문에, 리스트를 많은 부분 리스트로 나누고 각각의 부분 리스트를 Thread pool에 전달하는 것이 좋다. 부분 리스트에 대한 처리를 마친 쓰레드가 즉시 다른 리스트를 처리할 수 있다.

이를 위해 리스트를 부분 리스트로 나누는 연산이 필요하다. 아래 코드에서 부분 리스트를 둘로 나누는 과정이 재귀적으로 수행된다.

fun splitListAt(index: Int): List<List<A>> {
        tailrec fun splitListAt(acc: List<A>,
                            list: List<A>, i: Int): List<List<A>> =
            when (list) {
                Nil -> List(list.reverse(), acc)
                is Cons ->
                    if (i == 0)
                        List(list.reverse(), acc)
                    else
                        splitListAt(acc.cons(list.head), list.tail, i - 1)
            }
        return when {
            index < 0        -> splitListAt(0)
            index > length() -> splitListAt(length())
            else             -> splitListAt(Nil, this.reverse(), this.length() - index)
        }
    }

    fun divide(depth: Int): List<List<A>> {
        tailrec
        fun divide(list: List<List<A>>, depth: Int): List<List<A>> =
                when (list) {
                    Nil -> list // dead code
                    is Cons ->
                        if (list.head.length() < 2 || depth < 1)
                            list
                        else
                            divide(list.flatMap { x -> x.splitListAt(x.length() / 2) }, depth - 1)
                }
        return if (this.isEmpty())
            List(this)
        else
            divide(List(this), depth)
    }

8.6.3 부분 리스트를 병렬로 처리하기

부분 리스트를 병렬로 처리하려면 ExecutorService를 인자로 받는 특별한 접기 함수가 필요하다.

  1. divide로 부분 리스트를 적절히 나누고
  2. ExecutorService에 task를 넘겨 부분 리스트로 구성된 리스트를 처리한다.
        fun <B> parFoldLeft(es: ExecutorService,
                        identity: B,
                        f: (B) -> (A) -> B,
                        m: (B) -> (B) -> B): Result<B> =
        try {
            val result: List<B> = divide(1024).map { list: List<A> ->
                es.submit<B> { list.foldLeft(identity, f) }
            }.map<B> { fb ->
                try {
                    fb.get()
                } catch (e: InterruptedException) {
                    throw RuntimeException(e)
                } catch (e: ExecutionException) {
                    throw RuntimeException(e)
                }
            }
            Result(result.foldLeft(identity, m))
        } catch (e: Exception) {
            Result.failure(e)
        }

접기 연산을 사용하지 않고 직접 병렬 매핑을 구현할 수도 있다.

fun <B> parMap(es: ExecutorService, g: (A) -> B): Result<List<B>> =
        try {
            val result = this.map { x ->
                es.submit<B> { g(x) } // 함수 적용을 map에 넘기고
            }.map<B> { fb ->
                try {
                    fb.get() // 각각의 결과를 가져옴
                } catch (e: InterruptedException) {
                    throw RuntimeException(e)
                } catch (e: ExecutionException) {
                    throw RuntimeException(e)
                }
            }
            Result(result)
        } catch (e: Exception) {
            Result.failure(e)
        }