C# 线程同步详解

2019-12-30 16:23:07刘景俊

程序在执行的时候,编译器必须将变量m_flag和m_value从RAM读入CPU寄存器,RAM先传递m_value的值0,thread1把值变为5,但是thread2并不知道thread2仍然认为值为0,这种问题一般来说发生在多核CPU的概率大一些,应该CPU越多,多个线程同时访问资源的几率就越大。

关键字volatile,作用禁止C#编译器、JTP编译器和CPU执行的一些优化,如果做用于变量后,将不允许字段缓存到CPU的寄存器中,确保字段的读写都在RAM中进行。

互锁构造

System.Threading.Interlocked类中的每个方法都执行一次原子的读取以及写入操作,调用某个Interlocked方法之前的任何变量写入都在这个Interlocked方法调用之前执行,而调用之后的任何变量读取都在这个调用之后读取。

Interlocked方法主要是对INT32变量进行静态操作Add、Decrement、Compare、Exchange、CompareChange等方法,也接受object、Double等类型的参数。

原子操作:是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。

代码演示:

说明:通过Interlocked的方法异步查询几个web服务器,并同时返回数据,且结果只执行一次。


//上报状态类型
 enum CoordinationStatus
 {
 Cancel,
 Timeout,
 AllDone
 }

class AsyncCoordinator
 {
 //AllBegun 内部调用JustEnded来递减它
 private int _mOpCount = 1;
 //0=false,1=true
 private int _mStatusReported = 0;
 private Action<CoordinationStatus> _mCallback;
 private Timer _mTimer;
 //发起一个操作之前调用
 public void AboutToBegin(int opsToAdd = 1)
 {
 Interlocked.Add(ref _mOpCount, opsToAdd);
 }
 //处理好一个操作的结果之后调用
 public void JustEnded()
 {
 if (Interlocked.Decrement(ref _mOpCount) == 0)
 {
 ReportStatus(CoordinationStatus.AllDone);
 } 
 }
 //该方法必须在发起所有操作后调用
 public void AllBegin(Action<CoordinationStatus> callback, int timeout = Timeout.Infinite)
 {
 _mCallback = callback;
 if (timeout != Timeout.Infinite)
 {
 _mTimer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
 JustEnded();
 }
 }
 private void TimeExpired(object o)
 {
 ReportStatus(CoordinationStatus.Timeout);
 }
 public void Cancel()
 {
 ReportStatus(CoordinationStatus.Cancel);
 }
 private void ReportStatus(CoordinationStatus status)
 {
 //如果状态从未报告过,就报告它,否则就忽略它,只调用一次
 if (Interlocked.Exchange(ref _mStatusReported, 1) == 0)
 {
 _mCallback(status);
 } 
 }
 }

class MultiWebRequest
 {
 //辅助类 用于协调所有的异步操作
 private AsyncCoordinator _mac = new AsyncCoordinator();
 protected Dictionary<string,object> _mServers = new Dictionary<string, object>
 {
 {"http://www.easck.com//www.Microsoft.com",null},{"http://www.easck.com//www.souhu.com",null},{"http://www.easck.com//www.tencent.com",null},
 {"http://www.easck.com//通过异步方式一次性发起请求
 var httpclient = new HttpClient();
 foreach (var server in _mServers.Keys)
 {
 _mac.AboutToBegin(1);
 httpclient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
 }
 _mac.AllBegin(AllDone,timeout);
 Console.WriteLine("");
 }
 private void ComputeResult(string server, Task<Byte[]> task)
 {
 object result;
 if (task.Exception != null)
 {
 result = task.Exception.InnerException;
 }
 else
 {
 //线程池处理IO
 result = task.Result.Length;
 }
 //保存返回结果的长度
 _mServers[server] = result;
 _mac.JustEnded();
 }
 public void Cancel()
 {
 _mac.Cancel();
 }
 private void AllDone(CoordinationStatus status)
 {
 sp.Stop();
 Console.WriteLine("响应耗时总计{0}",sp.Elapsed);
 switch (status)
 {
 case CoordinationStatus.Cancel:
  Console.WriteLine("操作取消");
  break;
 case CoordinationStatus.AllDone:
  Console.WriteLine("操作完成,完成的结果如下");
  foreach (var server in _mServers)
  {
  Console.WriteLine("{0}",server.Key);
  object result = server.Value;
  if (result is Exception)
  {
  Console.WriteLine("错误原因{0}",result.GetType().Name);
  }
  else
  {
  Console.WriteLine("返回字节数为:{0}",result);
  }
  }
  break;
 case CoordinationStatus.Timeout:
  Console.WriteLine("操作超时");
  break;
 default:
  throw new ArgumentOutOfRangeException("status", status, null);
 }
 }
 }