Java IO - 源码: InputStream

本文主要从JDK 11 源码角度分析InputStream。 @pdai

InputStream 类实现关系

InputStream是输入字节流,具体的实现类层次结构如下:

InputStream 抽象类

InputStream 类重要方法设计如下:

// 读取下一个字节,如果没有则返回-1
public abstract int read() 

// 将读取到的数据放在 byte 数组中,该方法实际上调用read(byte b[], int off, int len)方法
public int read(byte b[]) 

// 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的; 此方法会一直阻止,直到输入数据可用、检测到stream结尾或引发异常为止。
public int read(byte b[], int off, int len) 

// JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
public byte[] readAllBytes()

// JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public byte[] readNBytes(int len)

// JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public int readNBytes(byte[] b, int off, int len)

// 跳过指定个数的字节不读取
public long skip(long n) 

// 返回可读的字节数量
public int available() 

// 读取完,关闭流,释放资源
public void close() 

// 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
public synchronized void mark(int readlimit) 

// 重置读取位置为上次 mark 标记的位置
public synchronized void reset() 

// 判断当前流是否支持标记流,和上面两个方法配套使用
public boolean markSupported() 

// JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
public long transferTo(OutputStream out)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

源码实现

梳理部分InputStream及其实现类的源码分析。

InputStream

InputStream抽象类源码如下:

public abstract class InputStream implements Closeable {
    
    // 当使用skip方法时,最大的buffer size大小
    private static final int MAX_SKIP_BUFFER_SIZE = 2048;

    // 默认的buffer size
    private static final int DEFAULT_BUFFER_SIZE = 8192;

    // JDK11中增加了一个nullInputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
    public static InputStream nullInputStream() {
        return new InputStream() {
            private volatile boolean closed;

            private void ensureOpen() throws IOException {
                if (closed) {
                    throw new IOException("Stream closed");
                }
            }

            @Override
            public int available () throws IOException {
                ensureOpen();
                return 0;
            }

            @Override
            public int read() throws IOException {
                ensureOpen();
                return -1;
            }

            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                Objects.checkFromIndexSize(off, len, b.length);
                if (len == 0) {
                    return 0;
                }
                ensureOpen();
                return -1;
            }

            @Override
            public byte[] readAllBytes() throws IOException {
                ensureOpen();
                return new byte[0];
            }

            @Override
            public int readNBytes(byte[] b, int off, int len)
                throws IOException {
                Objects.checkFromIndexSize(off, len, b.length);
                ensureOpen();
                return 0;
            }

            @Override
            public byte[] readNBytes(int len) throws IOException {
                if (len < 0) {
                    throw new IllegalArgumentException("len < 0");
                }
                ensureOpen();
                return new byte[0];
            }

            @Override
            public long skip(long n) throws IOException {
                ensureOpen();
                return 0L;
            }

            @Override
            public long transferTo(OutputStream out) throws IOException {
                Objects.requireNonNull(out);
                ensureOpen();
                return 0L;
            }

            @Override
            public void close() throws IOException {
                closed = true;
            }
        };
    }
    
    // 读取下一个字节的数据,如果没有则返回-1
    public abstract int read() throws IOException;

    // 将读取到的数据放在 byte 数组中,该方法实际上调用read(byte b[], int off, int len)方法
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }

    // 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的; 此方法会一直阻止,直到输入数据可用、检测到stream结尾或引发异常为止。
    public int read(byte b[], int off, int len) throws IOException {
        // 检查边界
        Objects.checkFromIndexSize(off, len, b.length);
        if (len == 0) {
            return 0;
        }

        // 读取下一个字节
        int c = read();
        if (c == -1) { // 读到stream末尾,则返回读取的字节数量为-1
            return -1;
        }
        b[off] = (byte)c;

        // i用来记录取了多少个字节
        int i = 1;
        try {
            // 循环读取
            for (; i < len ; i++) {
                c = read();
                if (c == -1) {// 读到stream末尾,则break
                    break;
                }
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {
        }
        // 返回读取到的字节个数
        return i;
    }

    // 分配的最大数组大小。
    // 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
    private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

    // JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
    public byte[] readAllBytes() throws IOException {
        return readNBytes(Integer.MAX_VALUE);
    }

    // JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
    public byte[] readNBytes(int len) throws IOException {
        // 边界检查
        if (len < 0) {
            throw new IllegalArgumentException("len < 0");
        }

        List<byte[]> bufs = null; // 缓存每次读取到的内容放到bufs,最后组装成result
        byte[] result = null; // 最后读取到的内容
        int total = 0;
        int remaining = len; // 剩余字节长度
        int n;
        do {
            byte[] buf = new byte[Math.min(remaining, DEFAULT_BUFFER_SIZE)];
            int nread = 0;

            // 读取到结束为止,读取大小n可能大于或小于缓冲区大小
            while ((n = read(buf, nread,
                    Math.min(buf.length - nread, remaining))) > 0) {
                nread += n; 
                remaining -= n;
            }

            if (nread > 0) {
                if (MAX_BUFFER_SIZE - total < nread) {
                    throw new OutOfMemoryError("Required array size too large");
                }
                total += nread;
                if (result == null) {
                    result = buf;
                } else {
                    if (bufs == null) {
                        bufs = new ArrayList<>();
                        bufs.add(result);
                    }
                    bufs.add(buf);
                }
            }
            // 如果读不到内容(返回-1)或者没有剩余的字节,则跳出循环
        } while (n >= 0 && remaining > 0);

        if (bufs == null) {
            if (result == null) {
                return new byte[0];
            }
            return result.length == total ?
                result : Arrays.copyOf(result, total);
        }

        // 组装最后的result
        result = new byte[total];
        int offset = 0;
        remaining = total;
        for (byte[] b : bufs) {
            int count = Math.min(b.length, remaining);
            System.arraycopy(b, 0, result, offset, count);
            offset += count;
            remaining -= count;
        }

        return result;
    }

    // JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
    public int readNBytes(byte[] b, int off, int len) throws IOException {
        Objects.checkFromIndexSize(off, len, b.length);

        int n = 0;
        while (n < len) {
            int count = read(b, off + n, len - n);
            if (count < 0)
                break;
            n += count;
        }
        return n;
    }

    // 跳过指定个数的字节不读取
    public long skip(long n) throws IOException {

        long remaining = n;
        int nr;

        if (n <= 0) {
            return 0;
        }

        int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
        byte[] skipBuffer = new byte[size];
        while (remaining > 0) {
            nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
            if (nr < 0) {
                break;
            }
            remaining -= nr;
        }

        return n - remaining;
    }

    // 返回可读的字节数量
    public int available() throws IOException {
        return 0;
    }

    // 读取完,关闭流,释放资源
    public void close() throws IOException {}

    // 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
    public synchronized void mark(int readlimit) {}

    // 重置读取位置为上次 mark 标记的位置
    public synchronized void reset() throws IOException {
        throw new IOException("mark/reset not supported");
    }

    // 判断当前流是否支持标记流,和上面两个方法配套使用。默认是false,由子类方法重写
    public boolean markSupported() {
        return false;
    }

    // JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
    public long transferTo(OutputStream out) throws IOException {
        Objects.requireNonNull(out, "out");
        long transferred = 0;
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int read;
        while ((read = this.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
            out.write(buffer, 0, read);
            transferred += read;
        }
        return transferred;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266

总结下JDK9的更新点

类 java.io.InputStream 中增加了新的方法来读取和复制 InputStream 中包含的数据。

  • readAllBytes:读取 InputStream 中的所有剩余字节。
  • readNBytes: 从 InputStream 中读取指定数量的字节到数组中。
  • transferTo:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中 。
public class TestInputStream {
    private InputStream inputStream;
    private static final String CONTENT = "Hello World";
    @Before
    public void setUp() throws Exception {
        this.inputStream =
            TestInputStream.class.getResourceAsStream("/input.txt");
    }
    @Test
    public void testReadAllBytes() throws Exception {
        final String content = new String(this.inputStream.readAllBytes());
        assertEquals(CONTENT, content);
    }
    @Test
    public void testReadNBytes() throws Exception {
        final byte[] data = new byte[5];
        this.inputStream.readNBytes(data, 0, 5);
        assertEquals("Hello", new String(data));
    }
    @Test
    public void testTransferTo() throws Exception {
        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        this.inputStream.transferTo(outputStream);
        assertEquals(CONTENT, outputStream.toString());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • read(byte[], int, int)readNBytes(byte[], int, int)看似是实现的相同功能,为何会设计readNBytes方法呢

这个问题可以参看这里 (opens new window)

  1. read(byte[], int, int)是尝试读到最多len个bytes,但是读取到的内容长度可能是小于len的。
  2. readNBytes(byte[], int, int) 会一直(while循环)查找直到stream尾为止

举个例子:如果文本内容是12345<end>, read(s,0,10)是允许返回123的, 而readNbytes(s,0,10)会一直(while循环)查找直到stream尾为止,并返回12345.

补充下JDK11为什么会增加nullInputStream方法的设计?即空对象模式

  • 空对象模式

举个例子:

public class MyParser implements Parser {
  private static Action NO_ACTION = new Action() {
    public void doSomething() { /* do nothing */ }
  };

  public Action findAction(String userInput) {
    // ...
    if ( /* we can't find any actions */ ) {
      return NO_ACTION;
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

然后便可以始终可以这么调用,而不用再判断空了

ParserFactory.getParser().findAction(someInput).doSomething();
1

FilterInputStream

FilterInputStream 源码如下

public class FilterInputStream extends InputStream {
    
    // 被装饰的inputStream
    protected volatile InputStream in;
    
    // 构造函数,注入被装饰的inputStream
    protected FilterInputStream(InputStream in) {
        this.in = in;
    }

    // 本质是调用被装饰的inputStream的方法
    public int read() throws IOException {
        return in.read();
    }
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
     }
    public int read(byte b[], int off, int len) throws IOException {
        return in.read(b, off, len);
     }
    public long skip(long n) throws IOException {
        return in.skip(n);
    }
    public int available() throws IOException {
        return in.available();
    }
    public void close() throws IOException {
        in.close();
    }
    public synchronized void mark(int readlimit) {
        in.mark(readlimit);
     }
    public synchronized void reset() throws IOException {
        in.reset();
    }
    public boolean markSupported() {
        return in.markSupported();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

为什么被装饰的inputStream是volatile类型的

请参看: 关键字: volatile详解

ByteArrayInputStream

ByteArrayInputStream源码如下

public class ByteArrayInputStream extends InputStream {
    
    // 内部保存的byte 数组
    protected byte buf[];
    
    // 读取下一个字节的数组下标,byte[pos]就是read获取的下个字节
    protected int pos;
    
    // mark的数组下标位置
    protected int mark = 0;
    
    // 保存的有效byte的个数
    protected int count;

    // 构造方法
    public ByteArrayInputStream(byte buf[]) {
        this.buf = buf;              
        this.pos = 0;
        this.count = buf.length;
     }
    
    // 构造方法,带offset的
     public ByteArrayInputStream(byte buf[], int offset, int length) {                
        this.buf = buf;
        this.pos = offset;
        this.count = Math.min(offset + length, buf.length);
        this.mark = offset;
    }
    
    // 从流中读取下一个字节,没有读取到返回 -1
    public synchronized int read() {
        return (pos < count) ? (buf[pos++] & 0xff) : -1;
    }
    
    // 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的
    public synchronized int read(byte b[], int off, int len) {
        // 边界检查
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        }

        if (pos >= count) {
            return -1;
        }

        int avail = count - pos;
        if (len > avail) {
            len = avail;
        }
        if (len <= 0) {
            return 0;
        }

        // 从buf拷贝到byte 数组b中
        System.arraycopy(buf, pos, b, off, len);
        pos += len;
        return len;
    }

    // 跳过指定个数的字节不读取
    public synchronized long skip(long n) {
        long k = count - pos;
        if (n < k) {
            k = n < 0 ? 0 : n;
        }

        pos += k;
        return k;
    }

    // 还有稍稍byte在buffer中未读取,即总的count 减去 当前byte位置
    public synchronized int available() {
        return count - pos;
    }

    // 支持mark所以返回true
    public boolean markSupported() { 
        return true;
    }  

    // 在流中当前位置mark, readAheadLimit参数未使用    
    public void mark(int readAheadLimit) {            
        mark = pos;
    }

    // 重置流,即回到mark的位置
    public synchronized void reset() {
        pos = mark;
    }

    // 关闭ByteArrayInputStream不会产生任何动作
    public void close() throws IOException { 

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

BufferedInputStream

BufferedInputStream源码如下

public class BufferedInputStream extends FilterInputStream {

    // 默认的buffer大小
    private static int DEFAULT_BUFFER_SIZE = 8192;

    // 分配的最大数组大小。
    // 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

    // 内部保存在byte 数组中
    protected volatile byte buf[];

    // 关闭流的方法可能是异步的,所以使用原子AtomicReferenceFieldUpdater提供CAS无锁方式(可以解决CAS的ABA问题)来保证
    private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class,  byte[].class, "buf");

    // 有效byte的大小
    protected int count;

    // 当前位置
    protected int pos;

    // 最后一次,调用mark方法,标记的位置
    protected int markpos = -1;

    /**
     * 该变量惟一入口就是mark(int readLimit),好比调用方法mark(1024),那么后面读取的数据若是
     * 超过了1024字节,那么这次mark就为无效标记,子类能够选择抛弃该mark标记,从头开始。不过具体实现
     * 跟具体的子类有关,在BufferedInputStream中,会抛弃mark标记,从新将markpos赋值为-1
     */
    protected int marklimit;

    // 获取被装饰的stream
    private InputStream getInIfOpen() throws IOException {
        InputStream input = in;
        if (input == null)
            throw new IOException("Stream closed");
        return input;
    }

    // 获取实际内部的buffer数组
    private byte[] getBufIfOpen() throws IOException {
        byte[] buffer = buf;
        if (buffer == null)
            throw new IOException("Stream closed");
        return buffer;
    }

    // 构造函数,buffer是8kb
    public BufferedInputStream(InputStream in) {
        this(in, DEFAULT_BUFFER_SIZE);
    }

    // 构造函数,指定buffer大小
    public BufferedInputStream(InputStream in, int size) {
        super(in);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        buf = new byte[size];
    }

    /**
     * 用更多的数据填充缓冲区,考虑到shuffling和其他处理标记的技巧,
     * 假设它是由同步方法调用的。该方法还假设所有数据已经被读入,因此pos >count。
     */
    private void fill() throws IOException {
    	// 得到内部缓冲区buffer
        byte[] buffer = getBufIfOpen();
        // 没有mark的情况下, pos为0
        if (markpos < 0)
            pos = 0;            /* no mark: throw away the buffer */
        // pos >= buffer.length  buffer已经被读取完了 
        else if (pos >= buffer.length)  /* no room left in buffer */
        	// markpos > 0  有标记,标记处在缓存中间
            if (markpos > 0) {  /* can throw away early part of the buffer */
            	// 把buffer中,markpos到pos的部分移动到0-sz处,pos设置为sz,markpos为0
                int sz = pos - markpos;
                System.arraycopy(buffer, markpos, buffer, 0, sz);
                pos = sz;
                markpos = 0;
                // markpos已经为0了,marklimit比buffer.length小,再读取buffer已经没有地方了
            } else if (buffer.length >= marklimit) {
            	// 清空缓存,清空标记,markpos为-1,pos为0
                markpos = -1;   /* buffer got too big, invalidate mark */
                pos = 0;        /* drop buffer contents */
                // markpos已经为0了,marklimit比buffer.length大,而buffer.length已经最大了,不能扩容
            } else if (buffer.length >= MAX_BUFFER_SIZE) {
                throw new OutOfMemoryError("Required array size too large");
               // markpos已经为0了,marklimit比buffer.length大
            } else {            /* grow buffer */
            	// 建立一个长度为min(2*pos,marklimit,MAX_BUFFER_SIZE),的缓存数组,然后把原来0-pos移动到新数组的0-pos处
                int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
                        pos * 2 : MAX_BUFFER_SIZE;
                if (nsz > marklimit)
                    nsz = marklimit;
                byte nbuf[] = new byte[nsz];
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                // 用bufUpdater替换buffer
                if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                    // Can't replace buf if there was an async close.
                    // Note: This would need to be changed if fill()
                    // is ever made accessible to multiple threads.
                    // But for now, the only way CAS can fail is via close.
                    // assert buf == null;
                    throw new IOException("Stream closed");
                }
                buffer = nbuf;
            }
        // 当前读取上限count为pos
        count = pos;
        // 从内部的输入流,读取pos到buffer.length部分,读取的字节数加到count
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        if (n > 0)
            count = n + pos;
    }

    // 读取byte
    public synchronized int read() throws IOException {
        // 说明当前buf[]数组大小不够了,须要fill()
        if (pos >= count) {
            fill();
            // 说明没有读取到任何数据
            if (pos >= count)
                return -1;
        }
        return getBufIfOpen()[pos++] & 0xff;
    }

    /**
     * Read characters into a portion of an array, reading from the underlying
     * stream at most once if necessary.
     */
    private int read1(byte[] b, int off, int len) throws IOException {
        int avail = count - pos;
        if (avail <= 0) {
            // 当写入指定数组b的长度大小超过BufferedInputStream中核心缓存数组buf[]的大小而且 markpos < 0,那么就直接从数据流中读取数据给b数组,而不经过buf[]缓存数组,避免buf[]数组急剧增大
            if (len >= getBufIfOpen().length && markpos < 0) {
                return getInIfOpen().read(b, off, len);
            }
            fill();
            avail = count - pos;
            if (avail <= 0) return -1;
        }
        int cnt = (avail < len) ? avail : len;
        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
        pos += cnt;
        return cnt;
    }

    // 读取到byte数组b中
    public synchronized int read(byte b[], int off, int len)
        throws IOException
    {
        getBufIfOpen(); // Check for closed stream
        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        int n = 0;
        for (;;) {
            int nread = read1(b, off + n, len - n);
            if (nread <= 0)
                return (n == 0) ? nread : n;
            n += nread;
            if (n >= len)
                return n;
            // if not closed but no bytes available, return
            InputStream input = in;
            if (input != null && input.available() <= 0)
                return n;
        }
    }

    // 跳过n个
    public synchronized long skip(long n) throws IOException {
        getBufIfOpen(); // Check for closed stream
        if (n <= 0) {
            return 0;
        }
        long avail = count - pos;

        if (avail <= 0) {
            // If no mark position set then don't keep in buffer
            if (markpos <0)
                return getInIfOpen().skip(n);

            // Fill in buffer to save bytes for reset
            fill();
            avail = count - pos;
            if (avail <= 0)
                return 0;
        }

        long skipped = (avail < n) ? avail : n;
        pos += skipped;
        return skipped;
    }

    // buf[]数组剩余字节数+输入流中剩余字节数
    public synchronized int available() throws IOException {
        int n = count - pos;
        int avail = getInIfOpen().available();
        return n > (Integer.MAX_VALUE - avail)
                    ? Integer.MAX_VALUE
                    : n + avail;
    }

    
    // 标记位置,marklimit只有在这里才可以被赋值,readlimit表示mark()方法执行后,最多可以从流中读取的数据
    // 若是超过该字节大小,那么在fill()的时候,就会认为此mark()标记无效,从新将 markpos = -1,pos = 0
    public synchronized void mark(int readlimit) {
        marklimit = readlimit;
        markpos = pos;
    }

    // 重置位置
    public synchronized void reset() throws IOException {
        getBufIfOpen(); // 如果已经close, 则直接报错
        if (markpos < 0)
            throw new IOException("Resetting to invalid mark");
        pos = markpos;
    }

    // 支持mark, 所以返回true
    public boolean markSupported() {
        return true;
    }

    // 通过AtomicReferenceFieldUpdater的CAS无锁方式close
    public void close() throws IOException {
        byte[] buffer;
        while ( (buffer = buf) != null) {
            if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)
                    input.close();
                return;
            }
            // Else retry in case a new buf was CASed in fill()
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246

AtomicReferenceFieldUpdater具体可以参考:JUC原子类: CAS, Unsafe和原子类详解

参考文章

  • JDK 11 源码
  • https://www.cnblogs.com/winterfells/p/8745297.html
  • https://www.cnblogs.com/AdaiCoffee/p/11369699.html

联系我

添加@pdai微信

PS:添加时请备注Java全栈,谢谢!