AsyncConnection 此物件代表整個 connection,裡面提供了收送(Write/Read)兩個主要介面供應用層(OSD/MON等)使用外,裡面也處理了整個 Ceph Node收送封包的邏輯處理,這部分比較像是一個 finite state machine(FSM),當前狀態是什麼時候,收到的封包是什麼,就切換到什麼狀態來處理。
每個 AsyncConnection 會像底層的 Event Engine註冊一個 call back function,當該 connection 接收到封包後,就會觸發該 function,而此 function就是 Process,所以接下來會從此 function 當作下手點,主要研究雙方連線建立的過程,特別是從 Service side 去觀察 Accept 封包後的流程。
Accept
- 當 Server Side 呼叫此 function後,就會開始等待 client來連線。
- 將 socket 跟 addr 都記錄下來,並且將狀態改成 STATE_ACCEPTING
- 發送一個 external event(read_handler) 給 eventCenter
- 此 read_handler 就是
process
,會一直根據當前 state 的狀態來進行各種處理
- 此 read_handler 就是
1 | 1866 void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) |
Process
當AsyncConnection有收到任何封包時,就會呼叫這個 function,我們這邊假設我們是 Server Side,然後當前 Socket 處於一種 Accepting 的狀態,在此狀態下收到連線的封包後,會怎麼處理。
1 | 0329 void AsyncConnection::process() |
基本上就是跑一個大迴圈,根據當前狀態處理不同的事情,直到狀態已經穩定 (prev_state == state), 如果不符合大部分的 State,則呼叫 _process_connection
來處理,譬如 STATE_ACCEPTING。
一旦建立 Server Socket時並且開始透過 Accept 等待連線時,狀態則初始化為 STATE_ACCEPTING,所以接下來就從這個狀態開始往下研究。
STATE_ACCEPTING
- 因為先前切換到此狀態時,是透過 external event 呼叫的(只會執行一次),所以這邊要將該 readable event handler (process) 正式的丟給 event center 一次
- 接下來要發送一些訊息到對面去,這邊需要下列資訊
- CEPH_BANNER
- Addr + Port
- 呼叫 try_send 去發送訊息
- 若成功 (r == 0), 狀態切換到 STATE_ACCEPTING_WAIT_BANNER_ADDR
- 若失敗 (r > 0),狀態切換到 STATE_WAIT_SEND,並且使用一個 strate_after_send 的變數來記住當成功送出後要切換成什麼狀態
- 若失敗 (r < 0),真的失敗了,就當作失敗處理。
1 | 1215 case STATE_ACCEPTING: |
STATE_ACCEPTING_WAIT_BANNER_ADDR
- 讀取對方的封包的資訊(代表 Client Side 也必須要發送相關訊息)
- CEPH_BANNER
- Addr + Port
- 比較 banner 資訊
- 若對方不知道自己的 addr,則透過 socket 的資訊取得並且記錄下來
- 狀態改成 STATE_ACCEPTING_WAIT_CONNECT_MSG
1 | 1243 case STATE_ACCEPTING_WAIT_BANNER_ADDR: |
STATE_ACCEPTING_WAIT_CONNECT_MSG
- 讀取 connect_msg 大小的資料,並且存放到 AsyncConnection 的成員 connect_msg中,該結構如下,紀錄了如 feature, type 等資訊。
- 最後將狀態轉換成
1
2
3
4
5
6
7
8
9
100099 struct ceph_msg_connect {
0100 __le64 features; /* supported feature bits */
0101 __le32 host_type; /* CEPH_ENTITY_TYPE_* */
0102 __le32 global_seq; /* count connections initiated by this host */
0103 __le32 connect_seq; /* count connections initiated in this session */
0104 __le32 protocol_version;
0105 __le32 authorizer_protocol;
0106 __le32 authorizer_len;
0107 __u8 flags; /* CEPH_MSG_CONNECT_* */
0108 } __attribute__ ((packed));
1 | 1282 case STATE_ACCEPTING_WAIT_CONNECT_MSG: |
STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
- 根據之前讀取到的 connect_msg 來操作
- 如果對方有設定 authorizer_len 的話,則在額外讀取 authorizer 相關的資訊
- 設定 peer 的 host type
- 根據 host type,取得對應的 policy
- 呼叫 handle_connect_msg 處理該 connection_msg
- 最後確認狀態已經不是本來的 STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
- 狀態理論上要因為呼叫了 handle_connect_msg 而變換,正常來說要變成 STATE_ACCEPTING_WAIT_SEQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
351297 case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH:
1298 {
1299 bufferlist authorizer_reply;
1300
1301 if (connect_msg.authorizer_len) {
1302 if (!authorizer_buf.length())
1303 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1304
1305 r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str());
1306 if (r < 0) {
1307 ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1308 goto fail;
1309 } else if (r > 0) {
1310 break;
1311 }
1312 }
1313
1314 ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
1315 << connect_msg.connect_seq << " global_seq "
1316 << connect_msg.global_seq << dendl;
1317 set_peer_type(connect_msg.host_type);
1318 policy = async_msgr->get_policy(connect_msg.host_type);
1319 ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1320 << ", policy.lossy=" << policy.lossy << " policy.server="
1321 << policy.server << " policy.standby=" << policy.standby
1322 << " policy.resetcheck=" << policy.resetcheck << dendl;
1323
1324 r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply);
1325 if (r < 0)
1326 goto fail;
1327
1328 // state is changed by "handle_connect_msg"
1329 assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
1330 break;
1331 }
- 狀態理論上要因為呼叫了 handle_connect_msg 而變換,正常來說要變成 STATE_ACCEPTING_WAIT_SEQ
STATE_ACCEPTING_WAIT_SEQ
- 從對面讀取其使用的 seq
- 呼叫 discard_requeued_up_to 來處理,根據當前收到的 seq 來做條件
- 將 out_q 一些不符合條件的成員都移除
- 狀態改成 STATE_ACCEPTING_READY
-1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
171333 case STATE_ACCEPTING_WAIT_SEQ:
1334 {
1335 uint64_t newly_acked_seq;
1336 r = read_until(sizeof(newly_acked_seq), state_buffer);
1337 if (r < 0) {
1338 ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
1339 goto fail_registered;
1340 } else if (r > 0) {
1341 break;
1342 }
1343
1344 newly_acked_seq = *((uint64_t*)state_buffer);
1345 ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
1346 discard_requeued_up_to(newly_acked_seq);
1347 state = STATE_ACCEPTING_READY;
1348 break;
1349 }
STATE_ACCEPTING_READY
- 清空 connect_msg
- 狀態改成 STATE_OPEN
- 如果當前 queue 內有資料(也許是先前 existing connection產生的?),馬上送一個 write_handler 將其處理完畢
1 | 1351 case STATE_ACCEPTING_READY: |
STATE_OPEN
- 讀取 TAG,根據 TAG 不同的數值執行不同的事情
- CEPH_MSGR_TAG_MSG: 代表有訊息近來,故將狀態切換成 STATE_OPEN_MESSAGE_HEADER
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
310343 case STATE_OPEN:
0344 {
0345 char tag = -1;
0346 r = read_until(sizeof(tag), &tag);
0347 if (r < 0) {
0348 ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl;
0349 goto fail;
0350 } else if (r > 0) {
0351 break;
0352 }
0353
0354 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
0355 ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
0356 set_last_keepalive(ceph_clock_now());
0357 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
0358 state = STATE_OPEN_KEEPALIVE2;
0359 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
0360 state = STATE_OPEN_KEEPALIVE2_ACK;
0361 } else if (tag == CEPH_MSGR_TAG_ACK) {
0362 state = STATE_OPEN_TAG_ACK;
0363 } else if (tag == CEPH_MSGR_TAG_MSG) {
0364 state = STATE_OPEN_MESSAGE_HEADER;
0365 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
0366 state = STATE_OPEN_TAG_CLOSE;
0367 } else {
0368 ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
0369 goto fail;
0370 }
0371
0372 break;
0373 }
- CEPH_MSGR_TAG_MSG: 代表有訊息近來,故將狀態切換成 STATE_OPEN_MESSAGE_HEADER
STATE_OPEN_MESSAGE_HEADER
- 根據 feature 的值,決定要走新版還是舊版的 header
- 讀取 header 大小的資料,然後將需要的資料都抓出來記錄下來。
- 驗證對方送來資料的 CRC 是否正確
- 將相關資料給 reset (這些結構都跟 header 有關)
- data_buf
- front
- middle
- data
- 記錄收到時間的時間戳
- 將狀態改變成 STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
1 | 0435 case STATE_OPEN_MESSAGE_HEADER: |
STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
在 Policy 中有兩個關於 Throttle 的變數
1
2
3
4
5
6
70085 /**
0086 * The throttler is used to limit how much data is held by Messages from
0087 * the associated Connection(s). When reading in a new Message, the Messenger
0088 * will call throttler->throttle() for the size of the new Message.
0089 */
0090 Throttle *throttler_bytes;
0091 Throttle *throttler_messages;這個 function 檢查是否有 throttle 訊息的數量限制,若有限制且超過上限,則建立一個 time event,並儲存下來。
- 最後將狀態切換到 STATE_OPEN_MESSAGE_THROTTLE_BYTES
1 | 0501 case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE: |
STATE_OPEN_MESSAGE_THROTTLE_BYTES;
- 從 connection 讀取資料,分別對應到 header 中的三個成員
- front
- middle
- data
- 此 function 則是檢查 throttle 訊息的 bytes 數量,若數量超過上限,也是建議一個 time event並存下來,待之後處理
- 最後將狀態切換到 STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
1 | 0523 case STATE_OPEN_MESSAGE_THROTTLE_BYTES: |
STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
- 如果剛剛有在 STATE_OPEN_MESSAGE_THROTTLE_BYTES 讀取到 front/middle/data 的資料的話,則這邊要確認 disaptch 本身的 throttle 有沒有超過,若超過也是送一個 time event 待稍後再來重新試試看
- 紀錄 throttle 的時間戳
- 狀態切換成 STATE_OPEN_MESSAGE_READ_FRONT
1 | 0548 case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE: |
STATE_OPEN_MESSAGE_READ_FRONT
- 接下來就是開始讀取真正的封包資料了,主要是分成三個部分
- front
- middle
- data
- 讀取前段資料,將內容先放到本身的 front 變數中
- 將狀態切成 STATE_OPEN_MESSAGE_READ_MIDDLE
1 | 0568 case STATE_OPEN_MESSAGE_READ_FRONT: |
STATE_OPEN_MESSAGE_READ_MIDDLE
- 讀取中段資料,將內容先放到本身的 middle 變數中
- 將狀態切成 STATE_OPEN_MESSAGE_READ_DATA_PREPARE
1 | 0589 case STATE_OPEN_MESSAGE_READ_MIDDLE: |
STATE_OPEN_MESSAGE_READ_DATA_PREPARE
- 準備好 buffer 供之後讀取 data 用,其中 data 部分除了長度外,還有 offset 也要處理
- 這邊還不會讀取資料,只是會根據讀出來的空間長度預先配置一個空間供之後讀取資料使用
- 將狀態切成 STATE_OPEN_MESSAGE_READ_DATA
1 | 0610 case STATE_OPEN_MESSAGE_READ_DATA_PREPARE: |
STATE_OPEN_MESSAGE_READ_DATA
- 透過一個迴圈嘗試將資料讀取出來並且放到之前所預先配置的空間 DATA
- 最後狀態切換到 STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
1 | 0638 case STATE_OPEN_MESSAGE_READ_DATA: |
STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
- 這個 function 比較長,不過可以說是最後一步驟了。
- 跟 header 一樣,根據 feature 決定使用新舊版本的 footer 格式
- 讀取 footer 的資料,如各區段的CRC等
透過 decode_message 此 function,將收集到的 (front, middle, data, footer.etc) 組合成一個完整的 message 格式的封包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
320270 Message *decode_message(CephContext *cct, int crcflags,
0271 ceph_msg_header& header,
0272 ceph_msg_footer& footer,
0273 bufferlist& front, bufferlist& middle,
0274 bufferlist& data)
....
0315 // make message
0316 Message *m = 0;
0317 int type = header.type;
0318 switch (type) {
0319
0320 // -- with payload --
0321
0322 case MSG_PGSTATS:
0323 m = new MPGStats;
0324 break;
0325 case MSG_PGSTATSACK:
0326 m = new MPGStatsAck;
0327 break;
0328
0329 case CEPH_MSG_STATFS:
0330 m = new MStatfs;
0331 break;
0332 case CEPH_MSG_STATFS_REPLY:
0333 m = new MStatfsReply;
0334 break;
0335 case MSG_GETPOOLSTATS:
0336 m = new MGetPoolStats;
0337 break;
0338 case MSG_GETPOOLSTATSREPLY:
0339 m = new MGetPoolStatsReply;
...針對該 message 設定一些相關屬性
- byte_throttler
- message_throttler
- dispatch_throttle_size
- recv_stamp
- throttle_stamp
- recv_complete_stamp
- 針對 sequence 進行一些判斷,當前的 message 可能中間有遺漏,或是很久以前的 message
將該 messaged 的 sequence 當作目前最後一個收到 sequence
1
20765 // note last received message.
0766 in_seq.set(message->get_seq());將狀態改成 STATE_OPEN
- 將本訊息塞入到 dispatch_queue 內,供應用層去處理
- 到這邊就結束了,接下來 AsyncManager 去定期去檢查 dispatch_queue,當有封包進來後,就會將該封包送給所有註冊的 Dispatcher 去處理。
1 | 0662 case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH: |
上述已經大概跑完了整個 Accept 的流程,當然此流程中是確保沒有任何錯誤,一切都是順利往下進行的。
接下來來探討若 Client 想要連線,則會怎麼處理。
Connect
- 當 AsyncMessager 創立 AsyncConnection時,就會先呼叫此 function 進行連線了,後續若有訊息發送時,會透過
_connect
重新連線。 - 設定 peer 的 addr以及 policy
- 呼叫 _connect 完成最後的連線步驟
1
2
3
4
5
60200 void connect(const entity_addr_t& addr, int type) {
0201 set_peer_type(type);
0202 set_peer_addr(addr);
0203 policy = msgr->get_policy(type);
0204 _connect();
0205 }
_connect
- 將狀態設定為 STATE_CONNECTING,接下來就將 read_handler 送給 Event Engine去處理,由於是透過 external event,所以則會馬上執行 read_handler,也就是 process。
1
2
3
4
5
6
7
8
91856 void AsyncConnection::_connect()
1857 {
1858 ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
1859
1860 state = STATE_CONNECTING;
1861 // rescheduler connection in order to avoid lock dep
1862 // may called by external thread(send_message)
1863 center->dispatch_event_external(read_handler);
1864 }
STATE_CONNECTING
- 檢查 cs 此變數,如果之前有連線過,則關閉先前的連線
- 同時也先刪除之前的 event
- 呼叫 worker 跟對方的 socket 去連線
- 創造一個 read_handler 的 event,來處理接下來收到封包的行為
- read_handler 就是 process
- 狀態切換成 STATE_CONNECTING_RE
1 | 0870 case STATE_CONNECTING: |
STATE_CONNECTING_RE
- 檢查當前 connectionSocket 連線狀態,本身若發現沒有連線則會自己重新連線
- r < 0, 連線依然失敗,則判定有問題, goto 離開
- r == 1, 成功,不做事情
- r == 0, 重連過程中有出現錯誤,可能是 EINPROGRESS 或是 EALREADY。
- 嘗試送出 CEPH_BANNER
- 若成功,狀態切換成 STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
- 若失敗,狀態切換成 STATE_WAIT_SEND,待之後重送後再處理。
1 | 0055 int is_connected() override { |
1 | 0901 case STATE_CONNECTING_RE: |
STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
- 讀取 SERVER 端送來的資訊
- CEPH_BANNER
- Address (Server本身,以及Server看到的 Client),所以有兩份。
- 比較兩邊的 CEPH_BANNER
- 比較 peer addr (server address)
- 我自己 socket 看到的
- 對方送過來的
- 將自己的 address 送給 server
- 若成功,將狀態切換成 STATE_CONNECTING_SEND_CONNECT_MSG
- 若失敗,將狀態切換成 STATE_WAIT_SEND,之後再處理。
1 | 0940 case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY: |
STATE_CONNECTING_SEND_CONNECT_MSG
- 設定 connect_msg 的資訊
- 如同之前所述,包含 featrue, type等。
- 將該 connect_msg 的資訊封裝起來到 bl變數中,透過 try_send送出
- 若成功則將狀態切換到 STATE_CONNECTING_WAIT_CONNECT_REPLY
1 | 1025 case STATE_CONNECTING_SEND_CONNECT_MSG: |
STATE_CONNECTING_WAIT_CONNECT_REPLY
從 Server 端讀取資料,將資料放到 state_buffer,此資料的格式為 ceph_msg_connect_reply
1
2
3
4
5
6
7
8
90110 struct ceph_msg_connect_reply {
0111 __u8 tag;
0112 __le64 features; /* feature bits for this session */
0113 __le32 global_seq;
0114 __le32 connect_seq;
0115 __le32 protocol_version;
0116 __le32 authorizer_len;
0117 __u8 flags;
0118 } __attribute__ ((packed));將此資訊記錄下來後,狀態切換成 STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
1 | 1071 case STATE_CONNECTING_WAIT_CONNECT_REPLY: |
STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
- 若前述回傳的 ceph_msg_connect_reply 有 authorizer 的資訊,則進行額外處理。
- 最後呼叫
handle_connect_reply
進行處理- 理論上 handle_connect_reply 會改變當前狀態,最後變成 STATE_CONNECTING_READY
- 上述處理完畢後,狀態必須要改變,若沒有代表有問題,直接 Assert
1 | 1093 case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH: |
STATE_CONNECTING_READY
- 這邊有個有趣的註解 // hooray!,代表到這一步連線基本上已經完成了,剩下最後一步驟就結束了。
- 對 dispatch_queue 設定當前 connection
- dispatch_queue 這邊有兩種類型,一種是存放 message,一種則是 Type + Connection,這邊屬於第二種
- 在 dispatch_queue 的 loop 中,會針對這兩種去處理,若是 messag 的,則直接將此訊息丟給所有註冊的 dispatcher,反之則根據 type 執行不同的任務
- 這邊放入的是 D_CONNECT 的 event,所以之後會執行
ms_deliver_handle_connect
這支 function。 - 接者這支 function 則是會通知所有 dispathcer 目前有新的連線到來,呼叫對應的
ms_handle_connect
來處理
- 呼叫 AsyncMessegner 內的 ms_deliver_handle_fast_connect
- fast_connect 相對於 connect 是更早會處理的函式,底層可確保此 function 一定會在有任何 message 被處理前先呼叫。
- 如果當前 queue 內有訊息,這時候再發送一個外部的 write_handler把queue給清空。
- 可能是由於先前的 try_send 沒有成功
- 到這邊後,連線就完成了,可以開始供應用層各種發送訊息了。
1 | 1163 case STATE_CONNECTING_READY: |
1 | 0155 while (!mqueue.empty()) { |
1 | 0610 /** |
1 | 0110 /** |
上述在 Accpeting 或者是 Connecting 的過程中,會透過下列兩個方式做一些深層的處理,這些處理同時會改變當前狀態,這邊就簡單大致上看過而已。
handle_connect_reply
- 根據 reply 內的 tag 類型來執行各種不同事情, 大部分都是錯誤相關的處理,若一切都正常的話,則會是
CEPH_MSGR_TAG_READY
,此時會將狀態切換成 STATE_CONNECTING_READY- CEPH_MSGR_TAG_FEATURES
- CEPH_MSGR_TAG_BADPROTOVER
- CEPH_MSGR_TAG_BADAUTHORIZER
- CEPH_MSGR_TAG_RESETSESSION
- CEPH_MSGR_TAG_RETRY_GLOBAL
- CEPH_MSGR_TAG_RETRY_SESSION
- CEPH_MSGR_TAG_WAIT
- CEPH_MSGR_TAG_SEQ
- CEPH_MSGR_TAG_READY
handle_connect_msg
根據 peer type 取得對應的 proto_version,放到 ceph_msg_connect_reply 的變數中
1
2
30671 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
0672 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
0673 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;若兩邊的 proto_version 不一致,則呼叫
_reply_aceept
去處理。- 若對方有要使用 cephX
- 根據不同的 protocol type (OSD/MDS/MOM) 進行不同的處理
- 檢查兩邊的 feature set 是否滿足彼此,若有問題則呼叫
_reply_accept
去處理 - 進行用戶驗證,失敗則呼叫
_reply_accept
- 若以前 peer addr 曾經有 connection 存在過,這時候就要進行一些處理,主要的處理都是基於兩個變數來決定,global_seq 以及 connect_seq
- global_seq 代表的是這個host已經建立過多少條 connection
- connect_seq 代表的是這個 session建立過多少條 connection
- 某些情況下,會嘗試捨棄舊有的 connection 並建立新的 connection 來使用
- 某些情況則是會繼續使用舊有的 connection,然後把一些新的資訊賦予到舊有 connection 的成員中
- 呼叫 accept_conn 將連線給記錄下來放到 conns 中,並且從 accepting_conns 中移除,
- 最後則將狀態改成 STATE_ACCEPTING_WAIT_SEQ
Summary
此 AsyncConnection 內容眾多,目前先主要觀察到整個建立連線的步驟,包含了 Accept 以及 connect 。
有機會再來把 read/write 相關的介面也都看一次,到時候可以更瞭解整體收送的行為。