マルチスレッドとAtomic変数について

自己紹介

アジェンダ

  • まずは基本
  • アトミック変数
  • v1.7から@atomicマクロ
  • 実践

まずは基本

スレッド数

デフォルトが1threadとなっているので、起動時にthread数を指定する。

$ julia

julia> Threads.nthreads()
1

$ julia --threads 4

julia> Threads.nthreads()
4

@threadマクロ

@threadマクロ

function main()
  Threads.@threads for i = 1:10
    println("(threadid, loop_number) = ($(Threads.threadid()), $(i))")
  end
end
julia> main()
(threadid, loop_number) = (2, 4)
(threadid, loop_number) = (2, 5)
(threadid, loop_number) = (2, 6)
(threadid, loop_number) = (3, 7)
(threadid, loop_number) = (3, 8)
(threadid, loop_number) = (1, 1)
(threadid, loop_number) = (1, 2)
(threadid, loop_number) = (4, 9)
(threadid, loop_number) = (1, 3)
(threadid, loop_number) = (4, 10)

@foreachマクロ

@foreachマクロ

function chnl()
  ch = Channel{Int64}() do cw
    for i in 1:10
      put!(cw, i)
    end
  end
end

function main()
  Threads.foreach(chnl()) do cw
    println("(threadid, loop_number) = ($(Threads.threadid()), $(cw))")
  end
end
julia> main()
(threadid, loop_number) = (1, 1)
(threadid, loop_number) = (2, 2)
(threadid, loop_number) = (1, 3)
(threadid, loop_number) = (3, 5)
(threadid, loop_number) = (4, 4)
(threadid, loop_number) = (1, 7)
(threadid, loop_number) = (4, 8)
(threadid, loop_number) = (3, 6)
(threadid, loop_number) = (4, 9)
(threadid, loop_number) = (2, 10)

@spawnマクロ

@spawnマクロ

function main()
  for i = 1:10
    t = Threads.@spawn begin
      println("(threadid, loop_number) = ($(Threads.threadid()), $(i))")
    end

    Threads.wait(t) # 返り値がある場合、fetchを使う
  end
end
julia> main()
(threadid, loop_number) = (2, 1)
(threadid, loop_number) = (1, 2)
(threadid, loop_number) = (1, 3)
(threadid, loop_number) = (1, 4)
(threadid, loop_number) = (4, 5)
(threadid, loop_number) = (4, 6)
(threadid, loop_number) = (4, 7)
(threadid, loop_number) = (4, 8)
(threadid, loop_number) = (1, 9)
(threadid, loop_number) = (1, 10)

Synchronization: Threads.Condition

Synchronization: Threads.Condition

function main()
  c = Threads.Condition()

  Threads.@threads for i = 1:10
    sleeptime = Threads.threadid()
    status = "(threadid, loop_number, sleeptime) = ($(Threads.threadid()), $(i), $(sleeptime)sec)"

    lock(c)
    try
      println("lock:   $(status)")
      sleep(sleeptime)
    finally
      println("unlock: $(status)")
      unlock(c)
    end
  end
end

Synchronization: Threads.Condition

julia> main()
lock:   (threadid, loop_number, sleeptime) = (1, 1, 1sec)
unlock: (threadid, loop_number, sleeptime) = (1, 1, 1sec)
lock:   (threadid, loop_number, sleeptime) = (1, 2, 1sec)
unlock: (threadid, loop_number, sleeptime) = (1, 2, 1sec)
lock:   (threadid, loop_number, sleeptime) = (1, 3, 1sec)
unlock: (threadid, loop_number, sleeptime) = (1, 3, 1sec)
lock:   (threadid, loop_number, sleeptime) = (2, 4, 2sec)
unlock: (threadid, loop_number, sleeptime) = (2, 4, 2sec)
lock:   (threadid, loop_number, sleeptime) = (2, 5, 2sec)
unlock: (threadid, loop_number, sleeptime) = (2, 5, 2sec)
lock:   (threadid, loop_number, sleeptime) = (2, 6, 2sec)
unlock: (threadid, loop_number, sleeptime) = (2, 6, 2sec)
lock:   (threadid, loop_number, sleeptime) = (4, 9, 4sec)
unlock: (threadid, loop_number, sleeptime) = (4, 9, 4sec)
lock:   (threadid, loop_number, sleeptime) = (4, 10, 4sec)
unlock: (threadid, loop_number, sleeptime) = (4, 10, 4sec)
lock:   (threadid, loop_number, sleeptime) = (3, 7, 3sec)
unlock: (threadid, loop_number, sleeptime) = (3, 7, 3sec)
lock:   (threadid, loop_number, sleeptime) = (3, 8, 3sec)
unlock: (threadid, loop_number, sleeptime) = (3, 8, 3sec)

アトミック変数

  • 不可分、原子的
  • アトミック操作: 途中で別の操作に割り込まれない
  • アトミック操作が保証された変数

v1.7から@atomicマクロ

Threads.Atomic

Threads.Atomic

プリミティブな「boolean, integer, float」のみ

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5

@atomicマクロ

@atomicマクロ

https://docs.julialang.org/en/v1/manual/multi-threading/#man-atomics

構造体のフィールドをatomic変数として定義可能

mutable struct AtomicStruct{T}; @atomic x::T; end

function main()
  a = AtomicStruct(0)
  b = 0

  Threads.@threads for i in 1:1000
    @atomic a.x += 1 # atomic operation
    b += 1           # conflict
  end

  (a.x, b)
end
julia> main()
(1000, 906)

実践

よくある円周率をもとめるMC法の実装
https://kaityo256.github.io/sevendayshpc/day3/index.html

using Random

mutable struct MCAtomic{T}; @atomic n::T; end

function main()
  TRIAL = 99999999
  rng = MersenneTwister(time_ns())

  mc = MCAtomic(0)

  Threads.@threads for i in 1:TRIAL
    x = rand(rng)
    y = rand(rng)

    if (x^2 + y^2 < 1.0); @atomic mc.n += 1; end
  end

  4.0 * mc.n/TRIAL
end

実践: 結果

julia> main()
3.1415725914157258 # 真値: 3.14159265359...

まとめ

  • Juliaでのプリミティブなatomic操作はAtomic(::T)
    • ただし、Int8, Int16, Int32, Int64, Int128, UInt8, UInt16, UInt32, UInt64, UInt128, Float16, Float32, Float64のみ
  • v1.7から@atomicマクロで構造体のフィールドとしてatomic操作ができるようになった

その他関数いろいろ

ソースコード

https://gist.github.com/TsuMakoto/4138d3c2fd05a400d02eff0b91e34658