信号量及其应用

时间:2022-07-22
本文章向大家介绍信号量及其应用,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

信号量用于解决进程的互斥,同步问题,可以使用信号量来表示系统中某种资源的数量。

信号量机制的发明者迪杰特斯拉也是我们的老熟人了。

整型信号量

用一个整型变量作为信号量,表示系统中的某种资源的数量。

对于信号量的有两种操作P操作和V操作。

如下为伪代码说明P V操作的细节。

public class Semaphore{
    int s;
    // 资源获取
    void P(){
        // 当前没有可用资源时,当前进程就会在此死等,直到其他进程使用完了释放了资源。
        while(s <= 0){};
        s--;
    }
    // 资源释放
    void V(){
        s++;
    }
}

注意上述的P操作和V操作都是原子性的。

上述代码中的若把s为当前可用资源的数量,P操作表示获取一个资源,V操作表示释放一个资源。使用资源前先进行P操作,使用完后进行V操作。

我们也发现了,在无可用资源时,该进程会一直占着处理机死等。aging情况下应该让进程进入阻塞态让出处理机资源,当有资源了就唤醒他,于是引入了记录型信号量的机制。

记录型信号量

伪代码描述如下:

public class Semaphore{
    int s;
    Queue waitQueue; // 等待队列
    void P(){
        s--;
        if(s < 0){
            waitQueue.block();
        }
    }

    void V(){
        s++;
        if(s <= 0){
            waitQueue.wakeup();
        }
    }
}

P操作先申请资源,再判断s,若发现无资源就使用block原语使当前进程由运行态转为阻塞态,并挂到等待队列的队尾;

V操作先释放资源,然后判断是否存在别进程等待该资源,若存在则使用wakeup原语唤醒等待队列的一个进程,将该进程由阻塞态转为就绪态。

当一个进程暂时得不到资源时,其就会自我阻塞,其就放弃处理机,将自己挂到等待队列上,与计数型信号量相比遵循了“让权等待”,即不会出现忙等。

信号量实现进程互斥

初始化时信号量的s为1,认为只有一块资源。

在访问临界区之前执行p操作申请资源;

访问完之后执行v操作释放资源。

利用Java信号量模拟模拟互斥访问临界区代码如下:

     public static class T implements Runnable{
        private Semaphore semaphore;
        public T(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入临界区");
                Thread.sleep(10);
                System.out.println(Thread.currentThread().getName() + "正在访问临界区");
                Thread.sleep(10);
                System.out.println(Thread.currentThread().getName() + "访问完了");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                 e.printStackTrace();
            }
            semaphore.release();     
        }   
     }

    public static void main(String[] args) {
        Semaphore sema = new Semaphore(1);
        Runnable r = new T(sema);
        for(int i = 0; i < 5; i++) {
            new Thread(r).start();
        }
    }

执行结果如下,可以发现每个线程是互斥访问的,不存在多个线程同时进入临界区的情况。

Thread-1进入临界区
Thread-1正在访问临界区
Thread-1访问完了
Thread-2进入临界区
Thread-2正在访问临界区
Thread-2访问完了
Thread-4进入临界区
Thread-4正在访问临界区
Thread-4访问完了
Thread-0进入临界区
Thread-0正在访问临界区
Thread-0访问完了
Thread-3进入临界区
Thread-3正在访问临界区
Thread-3访问完了

若不使用信号量,执行结果如下:

Thread-0进入临界区
Thread-1进入临界区
Thread-2进入临界区
Thread-3进入临界区
Thread-4进入临界区
Thread-2正在访问临界区
Thread-4正在访问临界区
Thread-3正在访问临界区
Thread-0正在访问临界区
Thread-1正在访问临界区
Thread-1访问完了
Thread-3访问完了
Thread-0访问完了
Thread-4访问完了
Thread-2访问完了

信号量实现进程同步

进程同步:让各并发的进程按照我们事先规定的顺序执行,即始终保证“后操作”在“前操作”之后发生。

初始化信号量初值为0,可以认为初始无资源,只有执行一次V操作才会获得一个资源,此时P操作才可以执行。

利用该特性在“前操作”之后使用V操作,“后操作”之前使用P操作。若后操作想先执行其必须执行一次P操作此时由于不存在资源,因此该进程在此阻塞住了,只有先执行完”前操作”,再执行完“前操作”之后的V操作,此时才会把之前阻塞态的进程转到就绪态,“后操作”才得以执行。

下面用一个具体案例说明。假设有两个人一个人专门倒水,一个人专门喝水,因此倒水的人的倒水操作应该为“前操作”,喝水的人喝水操作为“后操作”,利用Java的信号量实现该问题。

     // 倒水者
     public static class Maker implements Runnable{
         Semaphore semaphore;
         public Maker(Semaphore semaphore) {
             this.semaphore = semaphore;
         }
         public void make() {
             System.out.println("我倒水了");
         }
        @Override
        public void run() {
            make();
            // 前操作之后使用V操作
            semaphore.release(); 

        }

     }
     // 喝水者
     public static class Drinker implements Runnable{
         Semaphore semaphore;
         public Drinker(Semaphore semaphore) {
             this.semaphore = semaphore;
         }
         public void drink() {
             System.out.println("我喝水了");
         }
         @Override
         public void run() {
             try {
                semaphore.acquire();
                drink();

            } catch (InterruptedException e) {
                 e.printStackTrace();
            }    
         } 
      }
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(0);
        Runnable maker = new Maker(semaphore);
        Runnable drinker = new Drinker(semaphore);
        new Thread(maker).start();
        new Thread(drinker).start();
    }

信号量实现生产者消费者模型

生产者消费者模型介绍:

系统中一组生产者进程一组消费者进程,生产者每次生产一个产品放入缓冲区,消费者一次从缓冲区中拿出一个消费掉。

生产者,消费者共享一个初始为空,最大可放N个商品的缓冲区。缓冲区不满时才可以生产,缓冲区不空时才可以消费,此外缓冲区属于临界资源,各个进程应该互斥访问。

大体思路:

我们发现生产者消费者模型中共两个同步关系,一个互斥关系。

生产者每次生产消耗一块缓冲区,获得一个商品;消费者每次消耗一个商品,获得一块缓冲区。

同步关系一:商品这种资源为空时,对于商品这种资源生产者生产为前操作,消费者消费为后操作;

同步关系二:缓冲区这种资源为空时,对于缓冲区这种资源消费者消费为前操作,生产者生产为后操作。

互斥关系:每次生产/消费都应该互斥的访问缓冲区。

因此我们需要使用三个信号量,分别为goodsSemaphore,bufferSemaphore,mutexSemaphore来维持上述三种关系。其初值分别为0, n, 1。

具体实现:

public class ProductorConsummer {
     public static class Productor implements Runnable{
        private Semaphore goodsSemaphore;
        private Semaphore bufferSemaphore;
        private Semaphore mutexSemaphore; 
        public Productor(Semaphore goodsSemaphore, Semaphore bufferSemaphore, Semaphore mutexSemaphore) {
            this.goodsSemaphore = goodsSemaphore;
            this.bufferSemaphore = bufferSemaphore;
            this.mutexSemaphore = mutexSemaphore;
        }
        public void product() {
            System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个商品");
        }
        @Override
        public void run() {
            try {
                while(true) {      
                    // 对于缓冲区而言,消费为前操作,生产为后操作,因此后操作之前执行P操作
                    bufferSemaphore.acquire();

                    mutexSemaphore.acquire();
                    product();
                    mutexSemaphore.release();

                    // 对于商品而言,生产为前操作,消费为后操作,因此前操作之后执行V操作
                    goodsSemaphore.release();
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                 e.printStackTrace();
            }
        }

     }
     public static class Consummer implements Runnable{
         private Semaphore goodsSemaphore;
         private Semaphore bufferSemaphore;
         private Semaphore mutexSemaphore; 
         public Consummer(Semaphore goodsSemaphore, Semaphore bufferSemaphore, Semaphore mutexSemaphore) {
             this.goodsSemaphore = goodsSemaphore;
             this.bufferSemaphore = bufferSemaphore;
             this.mutexSemaphore = mutexSemaphore;
         }
         public void consum() {
             System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个商品");
         }
         @Override
         public void run() {
             try {
                 while(true) {
                     // 对于商品而言,生产为前操作,消费为后操作,因此后操作之后执行商品信号量P操作
                     goodsSemaphore.acquire();

                     mutexSemaphore.acquire();
                     consum();
                     mutexSemaphore.release();

                     // 对于缓冲区而言,消费为前操作,生产为后操作,因此前操作执行之后执行缓冲区信号量的V操作
                     bufferSemaphore.release();
                 }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                 e.printStackTrace();
            }  
         }

      }
    public static void main(String[] args) {
        int n = 3;
        Semaphore goodsSemaphore = new Semaphore(0);
        Semaphore bufferSemaphore = new Semaphore(n);
        Semaphore mutexSemaphore = new Semaphore(1); 
        Runnable productor = new Productor(goodsSemaphore, bufferSemaphore, mutexSemaphore);
        Runnable consummer = new Consummer(goodsSemaphore, bufferSemaphore, mutexSemaphore);
        for(int i = 0; i < 3; i++) {
            new Thread(consummer).start();
            new Thread(productor).start();
        }
    }

}

上述代码中两个p操作不能交换顺序,否则会产生死锁。分析如下

prodctor{
    mutex.p(); (1)
    buffer.p(); (2)
    生产一件商品
    ... 
}
consumer{
    mutex.p();(3)
    goods.p();(4)
    消费一件商品
    ...
}

按照(3)(4)(1)的顺序执行,执行完(3)(4)mutex = 0,goods = -1,此时其等待一个生产者生产一个商品其才可以执行后续部分,最终再释放mutex;然而生产者线程根本进不去缓冲区也阻塞住了因此产生了死锁。

信号量实现吸烟者问题

当前系统中总共四个进程,三个吸烟者进程,一个提供者进程。每次吸烟需要三种材料烟草、纸以及胶水。这三个人各拥有一样,供应者无限制的提供其余两样,抽烟者抽完后供应者会放另外两种材料,从而达到让这三位爷轮流抽烟的目的。

我们发现,只有供应者放了抽烟者才能抽,只有抽烟者抽了供应者才能接着放。

其中存在四个同步关系,

同步关系一:供应者放了组合一抽烟者1号才能抽烟,对于组合一这种资源供应者放为前操作,抽烟者1号抽为后操作;

同步关系二:供应者放了组合二抽烟者2号才能抽烟,同理;

同步关系三:供应者放了组合三抽烟者3号才能抽烟,同理;

同步关系四:抽烟者1/2/3号只有拿了桌上的材料,供应者才能接着放,对于桌面这种资源,抽烟者拿为先操作,供应者放为后操作。

此外对于无论是放还是拿都应该满足互斥访问桌面这块区域。

实现代码如下:

public class Smoking {
     public static class Smoker implements Runnable{
         Semaphore material;
         Semaphore buffer;
         Semaphore mutex;

         public Smoker(Semaphore material, Semaphore buffer, Semaphore mutex) {
            this.material = material;
            this.buffer = buffer;
            this.mutex = mutex;
        }
        public void smok() {
             System.out.println(Thread.currentThread().getName() + " 抽烟了");
         }
        public void run() {
            try {
                while(true) {
                    material.acquire();
                    mutex.acquire();
                    smok();
                    mutex.release();;
                    buffer.release();
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                 e.printStackTrace();
            }

        } 
     }
     public static class Provider implements Runnable{
         Semaphore[] materials; // 每个位置代表一种组合
         Semaphore buffer;
         Semaphore mutex;
         public Provider(Semaphore[] materials, Semaphore buffer, Semaphore mutex) {
            this.materials = materials;
            this.buffer = buffer;
            this.mutex = mutex;
        }
        public void provide1() {
             System.out.println(Thread.currentThread().getName() + "提供组合一");
         }
         public void provide2() {
             System.out.println(Thread.currentThread().getName() + "提供组合二");
         }
         public void provide3() {
             System.out.println(Thread.currentThread().getName() + "提供组合三");
         }
        @Override
        public void run() {
            try {
                while(true) {
                    buffer.acquire();
                    mutex.acquire();
                    provide1();
                    mutex.release();
                    materials[0].release();

                    buffer.acquire();
                    mutex.acquire();
                    provide2();
                    mutex.release();
                    materials[1].release();

                    buffer.acquire();
                    mutex.acquire();
                    provide3();
                    mutex.release();
                    materials[2].release(); 
                }
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
        }
     }
    public static void main(String[] args) {
        Semaphore[] materials = new Semaphore[]{new Semaphore(0),new Semaphore(0),new Semaphore(0)};
        Semaphore buffer = new Semaphore(1);
        Semaphore mutex = new Semaphore(1);
        new Thread(new Provider(materials, buffer, mutex)).start(); // 提供着
        for(Semaphore material : materials) {
            new Thread(new Smoker(material, buffer, mutex)).start();
        }
    }

}

信号量实现哲学家进餐问题

一张大圆桌子上坐着五位名声显赫的哲学家,他们中间五个空隙处放着五只筷子,哲学家要想吃放,其第一步拿到他左手边的筷子,第二步拿到他右手边的筷子,然后可以开始吃饭。

由于存在5只筷子,因此使用5个信号量表示,其初值均为1。5个哲学家可以看成5个进程。代码实现如下:

/**
  * 哲学家进餐问题
  * @author wg
  * @date 2020/07/07
  */
 public class PhilosophersDinning {
     public static class Philosopher implements Runnable{
         // 哲学家id
         int id;

         // chopsticks[id] 为其左手边的筷子
         // chopsticks[(id + 1 )% 5] 为其右手边的筷子
         Semaphore[] chopsticks;
         public Philosopher(Semaphore[] chopsticks, int id) {
             this.chopsticks = chopsticks;
             this.id = id;
         }
         public void eat() {
             System.out.println(id + "号哲学家正在进餐");
         }
        @Override
        public void run() {
            try {
                // 拿起左手边筷子
                chopsticks[id].acquire();
                Thread.sleep(1);
                // 拿起右手边筷子
                chopsticks[(id + 1) % chopsticks.length].acquire();
                eat();
                // 放下左手边筷子
                chopsticks[(id + 1) % chopsticks.length].release();
                // 放下右手边筷子
                chopsticks[id].release();
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
        } 

     }
    public static void main(String[] args) {
        Semaphore[] chopsticks = new Semaphore[5];
        for(int i = 0; i < 5; i++) {
            chopsticks[i] = new Semaphore(1);
        }
        for(int i = 0; i < 5; i++) {
            new Thread(new Philosopher(chopsticks, i)).start();
        }

    }
}

很容易分析得到当每个哲学家都拿起自己左手边的筷子时,就会陷入死锁。

信号量实现阻塞容器

以阻塞队列为例,利用信号量 + queue 实现阻塞队列。

对于阻塞队列当队列中没元素时,从队列中取元素的操作不应该被失败而是阻塞该进程,当别的进程放进去元素后该进程由阻塞态变为就绪态,然后从中取出元素。当阻塞队列满了时,往队列中加元素的操作应该被阻塞。

我们发现存在两种资源,元素 和 空间 ,为其定义两个信号量记做itemS,和bufferS,此外为了互斥访问队列还定义互斥量mutexS,初值如下:

itemS.s = 0, bufferS.s = n, mutexS = 1。

其中n为队列的最大容量。

思路如下:

把信号量itemS和队列中的元素逻辑上相对应,在执行获取队列中元素之前,先执行itemS.P(),若此时队列中无元素也就意味着itemS的值为0,该进程就此阻塞。执行添加操作后再执行itemS.V(),如此队列中元素多了一个,itemS中资源亦多了一个,此时之前阻塞住的进程就可以恢复了。如此解决了队列为空时取元素的场景,对于队列满了时加元素的场景同理,就不多加赘述了。

实现代码如下:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BlockingQueue<E> {
     private Semaphore itemS;
     private Semaphore bufferS;
     private Semaphore mutex;
     private Queue<E> data;

     public BlockingQueue(int boundary) {
         this.itemS = new Semaphore(0);
         this.bufferS = new Semaphore(boundary);
         this.mutex = new Semaphore(1);
         this.data = new LinkedList<E>();
     }
     public void add(E element) throws InterruptedException {
         bufferS.acquire();
         mutex.acquire();
         data.add(element);
         mutex.release();
         itemS.release();
     }

     public E remove() throws InterruptedException {
         E element = null;
         itemS.acquire();
         mutex.acquire();
         element = data.remove();
         mutex.release();
         bufferS.release();
         return element;
     }
}

测试代码如下

class Adder implements Runnable{
    private BlockingQueue<Integer> queue;
    public Adder(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    public void run() {
        for(int i = 0; i < 100000; i++) {
            try {
                queue.add(i);
                System.out.println("adder 放入了" + i);
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
        }
    }
}

class Remover implements Runnable{
    private BlockingQueue<Integer> queue;
    public Remover(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    public void run() {
        for(int i = 0; i < 100000; i++) {
            try {
                System.out.println("remover 取出了" + queue.remove());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
        }
    }
}
public class Test {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new BlockingQueue<Integer>(5);
        for(int i = 0; i < 5; i++) {
//            new Thread(new Adder(queue)).start();
            new Thread(new Remover(queue)).start();
        }

    }
}