Disruptor-NET和内存栅栏
Disruptor-NET算法(是一种无锁算法)需要我们自己实现某一种特定的内存操作的语义以保证算法的正确性。这时我们就需要显式的使用一些指令来控制内存操作指令的顺序以及其可见性定义。这种指令称为内存栅栏。
内存一致性模型需要在各种的程序与系统的各个层次上定义内存访问的行为。在机器码与的层次上,其定义将影响硬件的设计者以及机器码开发人员;而在高级语言层次上,其定义将影响高级语言开发人员以及编译器开发人员和硬件设计人员。即,内存操作的乱序在各个层次都是存在的。这里,所谓的程序的执行顺序有三种:
(1)程序顺序:指在特定CPU上运行的,执行内存操作的代码的顺序。这指的是编译好的程序二进制镜像中的指令的顺序。编译器并不一定严格按照程序的顺序进行二进制代码的编排。编译器可以按照既定的规则,在执行代码优化的时候打乱指令的执行顺序,也就是上面说的程序顺序。并且,编译器可以根据程序的特定行为进行性能优化,这种优化可能改变算法的形式与算法的执行复杂度。(例如将switch转化为表驱动序列) (2)执行顺序:指在CPU上执行的独立的内存相关的代码执行的顺序。执行顺序和程序顺序可能不同,这种不同是编译器和CPU优化造成的结果。CPU在执行期(Runtime)根据自己的内存模型(跟编译器无关)打乱已经编译好了的指令的顺序,以达到程序的优化和最大限度的资源利用。
(3)感知顺序:指特定的CPU感知到他自身的或者其他CPU对内存进行操作的顺序。感知顺序和执行顺序可能还不一样。这是由于缓存优化或者内存优化系统造成的。
而最终的共享内存模型的表现形式是由这三种“顺序”共同决定的。即从源代码到最终执行进行了至少三个层次上的代码顺序调整,分别是由编译器和CPU完成的。我们上面提到,这种代码执行顺序的改变虽然在单线程程序中不会引发副作用,但是在多线程程序中,这种作用是不能够被忽略的,甚至可能造成完全错误的结果。因此,在多线程程序中,我们有时需要人为的限制内存执行的顺序。而这种限制是通过不同层次的内存栅栏完成的。
Thread.MemoryBarrier就是采用了CPU提供的某些特定的指令的内存栅栏,下面是msdn的解释【http://msdn.microsoft.com/zh-cn/library/vstudio/system.threading.thread.memorybarrier(v=vs.100).aspx】:
Thread.MemoryBarrier: 按如下方式同步内存访问:执行当前线程的处理器在对指令重新排序时,不能采用先执行 MemoryBarrier 调用之后的内存访问,再执行 MemoryBarrier 调用之前的内存访问的方式。
按照我个人的理解:就是写完数据之后,调用MemoryBarrier,数据就会立即刷新,另外在读取数据之前调用MemoryBarrier可以确保读取的数据是最新的,并且处理器对MemoryBarrier的优化小心处理。
int _answer;
bool _complete;
void A()
{
_answer = 123;
Thread.MemoryBarrier(); //在写完之后,创建内存栅栏
_complete = true;
Thread.MemoryBarrier();//在写完之后,创建内存栅栏
}
void B()
{
Thread.MemoryBarrier();//在读取之前,创建内存栅栏
if (_complete)
{
Thread.MemoryBarrier();//在读取之前,创建内存栅栏
Console.WriteLine(_answer);
}
}
Disruptor-NET正是通过Thread.MemoryBarrier 实现无锁和线程安全的内存操作,看下面是他的Atomic的Volatile类对常用数据类型的封装,volatile可以阻止代码重排序,并且值被更新的时候,会导致缓存失效,强制回写到主存中。
/// <summary>
/// An integer value that may be updated atomically
/// </summary>
public struct Integer
{
private int _value;
/// <summary>
/// Create a new <see cref="Integer"/> with the given initial value.
/// </summary>
/// <param name="value">Initial value</param>
public Integer(int value)
{
_value = value;
}
/// <summary>
/// Read the value without applying any fence
/// </summary>
/// <returns>The current value</returns>
public int ReadUnfenced()
{
return _value;
}
/// <summary>
/// Read the value applying acquire fence semantic
/// </summary>
/// <returns>The current value</returns>
public int ReadAcquireFence()
{
var value = _value;
Thread.MemoryBarrier();
return value;
}
/// <summary>
/// Read the value applying full fence semantic
/// </summary>
/// <returns>The current value</returns>
public int ReadFullFence()
{
var value = _value;
Thread.MemoryBarrier();
return value;
}
/// <summary>
/// Read the value applying a compiler only fence, no CPU fence is applied
/// </summary>
/// <returns>The current value</returns>
[MethodImpl(MethodImplOptions.NoOptimization)]
public int ReadCompilerOnlyFence()
{
return _value;
}
/// <summary>
/// Write the value applying release fence semantic
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteReleaseFence(int newValue)
{
_value = newValue;
Thread.MemoryBarrier();
}
/// <summary>
/// Write the value applying full fence semantic
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteFullFence(int newValue)
{
_value = newValue;
Thread.MemoryBarrier();
}
/// <summary>
/// Write the value applying a compiler fence only, no CPU fence is applied
/// </summary>
/// <param name="newValue">The new value</param>
[MethodImpl(MethodImplOptions.NoOptimization)]
public void WriteCompilerOnlyFence(int newValue)
{
_value = newValue;
}
/// <summary>
/// Write without applying any fence
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteUnfenced(int newValue)
{
_value = newValue;
}
/// <summary>
/// Atomically set the value to the given updated value if the current value equals the comparand
/// </summary>
/// <param name="newValue">The new value</param>
/// <param name="comparand">The comparand (expected value)</param>
/// <returns></returns>
public bool AtomicCompareExchange(int newValue, int comparand)
{
return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
}
/// <summary>
/// Atomically set the value to the given updated value
/// </summary>
/// <param name="newValue">The new value</param>
/// <returns>The original value</returns>
public int AtomicExchange(int newValue)
{
return Interlocked.Exchange(ref _value, newValue);
}
/// <summary>
/// Atomically add the given value to the current value and return the sum
/// </summary>
/// <param name="delta">The value to be added</param>
/// <returns>The sum of the current value and the given value</returns>
public int AtomicAddAndGet(int delta)
{
return Interlocked.Add(ref _value, delta);
}
/// <summary>
/// Atomically increment the current value and return the new value
/// </summary>
/// <returns>The incremented value.</returns>
public int AtomicIncrementAndGet()
{
return Interlocked.Increment(ref _value);
}
/// <summary>
/// Atomically increment the current value and return the new value
/// </summary>
/// <returns>The decremented value.</returns>
public int AtomicDecrementAndGet()
{
return Interlocked.Decrement(ref _value);
}
/// <summary>
/// Returns the string representation of the current value.
/// </summary>
/// <returns>the string representation of the current value.</returns>
public override string ToString()
{
var value = ReadFullFence();
return value.ToString();
}
}
/// <summary>
/// An integer value that may be updated atomically and is guaranteed to live on its own cache line (to prevent false sharing)
/// </summary>
[StructLayout(LayoutKind.Explicit, Size = CacheLineSize * 2)]
public struct PaddedInteger
{
[FieldOffset(CacheLineSize)]
private int _value;
/// <summary>
/// Create a new <see cref="PaddedInteger"/> with the given initial value.
/// </summary>
/// <param name="value">Initial value</param>
public PaddedInteger(int value)
{
_value = value;
}
/// <summary>
/// Read the value without applying any fence
/// </summary>
/// <returns>The current value</returns>
public int ReadUnfenced()
{
return _value;
}
/// <summary>
/// Read the value applying acquire fence semantic
/// </summary>
/// <returns>The current value</returns>
public int ReadAcquireFence()
{
var value = _value;
Thread.MemoryBarrier();
return value;
}
/// <summary>
/// Read the value applying full fence semantic
/// </summary>
/// <returns>The current value</returns>
public int ReadFullFence()
{
var value = _value;
Thread.MemoryBarrier();
return value;
}
/// <summary>
/// Read the value applying a compiler only fence, no CPU fence is applied
/// </summary>
/// <returns>The current value</returns>
[MethodImpl(MethodImplOptions.NoOptimization)]
public int ReadCompilerOnlyFence()
{
return _value;
}
/// <summary>
/// Write the value applying release fence semantic
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteReleaseFence(int newValue)
{
_value = newValue;
Thread.MemoryBarrier();
}
/// <summary>
/// Write the value applying full fence semantic
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteFullFence(int newValue)
{
_value = newValue;
Thread.MemoryBarrier();
}
/// <summary>
/// Write the value applying a compiler fence only, no CPU fence is applied
/// </summary>
/// <param name="newValue">The new value</param>
[MethodImpl(MethodImplOptions.NoOptimization)]
public void WriteCompilerOnlyFence(int newValue)
{
_value = newValue;
}
/// <summary>
/// Write without applying any fence
/// </summary>
/// <param name="newValue">The new value</param>
public void WriteUnfenced(int newValue)
{
_value = newValue;
}
/// <summary>
/// Atomically set the value to the given updated value if the current value equals the comparand
/// </summary>
/// <param name="newValue">The new value</param>
/// <param name="comparand">The comparand (expected value)</param>
/// <returns></returns>
public bool AtomicCompareExchange(int newValue, int comparand)
{
return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
}
/// <summary>
/// Atomically set the value to the given updated value
/// </summary>
/// <param name="newValue">The new value</param>
/// <returns>The original value</returns>
public int AtomicExchange(int newValue)
{
return Interlocked.Exchange(ref _value, newValue);
}
/// <summary>
/// Atomically add the given value to the current value and return the sum
/// </summary>
/// <param name="delta">The value to be added</param>
/// <returns>The sum of the current value and the given value</returns>
public int AtomicAddAndGet(int delta)
{
return Interlocked.Add(ref _value, delta);
}
/// <summary>
/// Atomically increment the current value and return the new value
/// </summary>
/// <returns>The incremented value.</returns>
public int AtomicIncrementAndGet()
{
return Interlocked.Increment(ref _value);
}
/// <summary>
/// Atomically increment the current value and return the new value
/// </summary>
/// <returns>The decremented value.</returns>
public int AtomicDecrementAndGet()
{
return Interlocked.Decrement(ref _value);
}
/// <summary>
/// Returns the string representation of the current value.
/// </summary>
/// <returns>the string representation of the current value.</returns>
public override string ToString()
{
var value = ReadFullFence();
return value.ToString();
}
}
PaddedInteger类,它使用了7个Integer,加上一个对象头,刚好64个字节。
剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充
- 微信开发扫一扫功能并且屏蔽分享菜单
- HTML语义化:HTML5新标签——template
- 纸上谈兵: 左倾堆 (leftist heap)
- 统计Go, Go, Go
- 前端构建:Source Maps详解
- 解决YUM下Loaded plugins: fastestmirror Determining fastest mirrors 的问题
- 被解放的姜戈01 初试天涯
- 被解放的姜戈02 庄园疑云
- eclipse汉化
- Java魔法堂:URI、URL(含URL Protocol Handler)和URN
- 语义化HTML:ul、ol和dl
- 程序员电邮札记
- JavaSE(十)之反射
- windows下安装redis
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【Pytorch 】笔记二:动态图、自动求导及逻辑回归
- 听说国漫最近崛起了,那我们就来爬几部国漫看看(动态加载,反爬)
- 微信小程序开发实战(25):预览图像
- 【Pytorch】笔记三:数据读取机制与图像预处理模块
- 表白利器,马赛克拼贴照片制作
- 【014期】JavaSE面试题(十四):基本IO流
- 微信小程序开发实战(24):选择图像
- 反 996 有理:催程序员交代码,写不出好软件
- 一千个不用 Null 的理由!
- WebAssembly 是 Deno 的好搭档
- Chrome开发者工具的11个高级使用技巧
- 怒爬某破Hub站资源,简单4步撸个鉴黄平台!
- 审阅“史上”最烂的代码
- BeanUtils 是用 Spring 的还是 Apache 的好?
- 一看就会的mysql索引优化(真实案例)