`
javahigh1
  • 浏览: 1226750 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

unix网络编程

 
阅读更多

一些可以后续观察的文章:
select,poll,epoll的内部机制调研
http://wenku.baidu.com/view/31f51d7da26925c52cc5bf4e.html

select poll epoll
select
每次都需提供fd_set集合,从应用程序空间往内核空间copy浪费时间.
当遇到sigint等中断信号时,select会返回<0
当select等待stdin返回,但是stdin中的数据没有被读走的话,再次select仍然会返回该stdin对应的fd.

poll
使用一个fd动态数组,而不是一个固定大小的fd集合. 虽然copy的东西少了,但仍不免要copy

epoll

epoll的优点:
1.支持一个进程打开大数目的socket描述符(FD)
select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

2.IO效率不随FD数目增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

3.使用mmap加速内核与用户空间的消息传递。
这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。

4.内核微调

这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。

一个非阻塞IO+select的例子:(来源于ibm网站,略加修改,使之可以编译过去)
from:Example: Nonblocking I/O and select()
来自:http://publib.boulder.ibm.com/infocenter/iseries/v5r4/index.jsp?topic=%2Frzab6%2Fpoll.htm

view plaincopy to clipboardprint?
01.#include <stdio.h>
02.#include <stdlib.h>
03.#include <sys/ioctl.h>
04.#include <sys/socket.h>
05.#include <sys/time.h>
06.#include <netinet/in.h>
07.#include <errno.h>
08.#include <unistd.h> // for close
09.#include <string.h> // for memset
10.#define SERVER_PORT 9877
11.#define TRUE 1
12.#define FALSE 0
13.main (int argc, char *argv[])
14.{
15. int i, len, rc, on = 1;
16. int listen_sd, max_sd, new_sd;
17. int desc_ready, end_server = FALSE;
18. int close_conn;
19. char buffer[80];
20. struct sockaddr_in addr;
21. struct timeval timeout;
22. fd_set master_set, working_set;
23. //struct fd_set master_set, working_set;
24. /*************************************************************/
25. /* Create an AF_INET stream socket to receive incoming */
26. /* connections on */
27. /*************************************************************/
28. listen_sd = socket(AF_INET, SOCK_STREAM, 0);
29. if (listen_sd < 0)
30. {
31. perror("socket() failed");
32. exit(-1);
33. }
34. /*************************************************************/
35. /* Allow socket descriptor to be reuseable */
36. /*************************************************************/
37. rc = setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR,
38. (char *)&on, sizeof(on));
39. if (rc < 0)
40. {
41. perror("setsockopt() failed");
42. close(listen_sd);
43. exit(-1);
44. }
45. /*************************************************************/
46. /* Set socket to be nonblocking. All of the sockets for */
47. /* the incoming connections will also be nonblocking since */
48. /* they will inherit that state from the listening socket. */
49. /*************************************************************/
50. rc = ioctl(listen_sd, FIONBIO, (char *)&on);
51. if (rc < 0)
52. {
53. perror("ioctl() failed");
54. close(listen_sd);
55. exit(-1);
56. }
57. /*************************************************************/
58. /* Bind the socket */
59. /*************************************************************/
60. memset(&addr, 0, sizeof(addr));
61. addr.sin_family = AF_INET;
62. addr.sin_addr.s_addr = htonl(INADDR_ANY);
63. addr.sin_port = htons(SERVER_PORT);
64. rc = bind(listen_sd,
65. (struct sockaddr *)&addr, sizeof(addr));
66. if (rc < 0)
67. {
68. perror("bind() failed");
69. close(listen_sd);
70. exit(-1);
71. }
72. /*************************************************************/
73. /* Set the listen back log */
74. /*************************************************************/
75. rc = listen(listen_sd, 32);
76. if (rc < 0)
77. {
78. perror("listen() failed");
79. close(listen_sd);
80. exit(-1);
81. }
82. /*************************************************************/
83. /* Initialize the master fd_set */
84. /*************************************************************/
85. FD_ZERO(&master_set);
86. max_sd = listen_sd;
87. FD_SET(listen_sd, &master_set);
88. /*************************************************************/
89. /* Initialize the timeval struct to 3 minutes. If no */
90. /* activity after 3 minutes this program will end. */
91. /*************************************************************/
92. timeout.tv_sec = 3 * 60;
93. timeout.tv_usec = 0;
94. /*************************************************************/
95. /* Loop waiting for incoming connects or for incoming data */
96. /* on any of the connected sockets. */
97. /*************************************************************/
98. do
99. {
100. /**********************************************************/
101. /* Copy the master fd_set over to the working fd_set. */
102. /**********************************************************/
103. memcpy(&working_set, &master_set, sizeof(master_set));
104. /**********************************************************/
105. /* Call select() and wait 5 minutes for it to complete. */
106. /**********************************************************/
107. printf("Waiting on select().../n");
108. rc = select(max_sd + 1, &working_set, NULL, NULL, &timeout);
109. /**********************************************************/
110. /* Check to see if the select call failed. */
111. /**********************************************************/
112. if (rc < 0)
113. {
114. perror(" select() failed");
115. break;
116. }
117. /**********************************************************/
118. /* Check to see if the 5 minute time out expired. */
119. /**********************************************************/
120. if (rc == 0)
121. {
122. printf(" select() timed out. End program./n");
123. break;
124. }
125. /**********************************************************/
126. /* One or more descriptors are readable. Need to */
127. /* determine which ones they are. */
128. /**********************************************************/
129. desc_ready = rc;
130. for (i=0; i <= max_sd && desc_ready > 0; ++i)
131. {
132. /*******************************************************/
133. /* Check to see if this descriptor is ready */
134. /*******************************************************/
135. if (FD_ISSET(i, &working_set))
136. {
137. /****************************************************/
138. /* A descriptor was found that was readable - one */
139. /* less has to be looked for. This is being done */
140. /* so that we can stop looking at the working set */
141. /* once we have found all of the descriptors that */
142. /* were ready. */
143. /****************************************************/
144. desc_ready -= 1;
145. /****************************************************/
146. /* Check to see if this is the listening socket */
147. /****************************************************/
148. if (i == listen_sd)
149. {
150. printf(" Listening socket is readable/n");
151. /*************************************************/
152. /* Accept all incoming connections that are */
153. /* queued up on the listening socket before we */
154. /* loop back and call select again. */
155. /*************************************************/
156. do
157. {
158. /**********************************************/
159. /* Accept each incoming connection. If */
160. /* accept fails with EWOULDBLOCK, then we */
161. /* have accepted all of them. Any other */
162. /* failure on accept will cause us to end the */
163. /* server. */
164. /**********************************************/
165. new_sd = accept(listen_sd, NULL, NULL);
166. if (new_sd < 0)
167. {
168. if (errno != EWOULDBLOCK)
169. {
170. perror(" accept() failed");
171. end_server = TRUE;
172. }
173. break;
174. }
175. /**********************************************/
176. /* Add the new incoming connection to the */
177. /* master read set */
178. /**********************************************/
179. printf(" New incoming connection - %d/n", new_sd);
180. FD_SET(new_sd, &master_set);
181. if (new_sd > max_sd)
182. max_sd = new_sd;
183. /**********************************************/
184. /* Loop back up and accept another incoming */
185. /* connection */
186. /**********************************************/
187. } while (new_sd != -1);
188. }
189. /****************************************************/
190. /* This is not the listening socket, therefore an */
191. /* existing connection must be readable */
192. /****************************************************/
193. else
194. {
195. printf(" Descriptor %d is readable/n", i);
196. close_conn = FALSE;
197. /*************************************************/
198. /* Receive all incoming data on this socket */
199. /* before we loop back and call select again. */
200. /*************************************************/
201. do
202. {
203. /**********************************************/
204. /* Receive data on this connection until the */
205. /* recv fails with EWOULDBLOCK. If any other */
206. /* failure occurs, we will close the */
207. /* connection. */
208. /**********************************************/
209. rc = recv(i, buffer, sizeof(buffer), 0);
210. if (rc < 0)
211. {
212. if (errno != EWOULDBLOCK)
213. {
214. perror(" recv() failed");
215. close_conn = TRUE;
216. }
217. break;
218. }
219. /**********************************************/
220. /* Check to see if the connection has been */
221. /* closed by the client */
222. /**********************************************/
223. if (rc == 0)
224. {
225. printf(" Connection closed/n");
226. close_conn = TRUE;
227. break;
228. }
229. /**********************************************/
230. /* Data was received */
231. /**********************************************/
232. len = rc;
233. printf(" %d bytes received/n", len);
234. /**********************************************/
235. /* Echo the data back to the client */
236. /**********************************************/
237. rc = send(i, buffer, len, 0);
238. if (rc < 0)
239. {
240. perror(" send() failed");
241. close_conn = TRUE;
242. break;
243. }
244. // addby nemo
245. break;
246. } while (TRUE);
247. /*************************************************/
248. /* If the close_conn flag was turned on, we need */
249. /* to clean up this active connection. This */
250. /* clean up process includes removing the */
251. /* descriptor from the master set and */
252. /* determining the new maximum descriptor value */
253. /* based on the bits that are still turned on in */
254. /* the master set. */
255. /*************************************************/
256. if (close_conn)
257. {
258. close(i);
259. FD_CLR(i, &master_set);
260. if (i == max_sd)
261. {
262. while (FD_ISSET(max_sd, &master_set) == FALSE)
263. max_sd -= 1;
264. }
265. }
266. } /* End of existing connection is readable */
267. } /* End of if (FD_ISSET(i, &working_set)) */
268. } /* End of loop through selectable descriptors */
269. } while (end_server == FALSE);
270. /*************************************************************/
271. /* Clean up all of the sockets that are open */
272. /*************************************************************/
273. for (i=0; i <= max_sd; ++i)
274. {
275. if (FD_ISSET(i, &master_set))
276. close(i);
277. }
278.}
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <errno.h>
#include <unistd.h> // for close
#include <string.h> // for memset
#define SERVER_PORT 9877
#define TRUE 1
#define FALSE 0
main (int argc, char *argv[])
{
int i, len, rc, on = 1;
int listen_sd, max_sd, new_sd;
int desc_ready, end_server = FALSE;
int close_conn;
char buffer[80];
struct sockaddr_in addr;
struct timeval timeout;
fd_set master_set, working_set;
//struct fd_set master_set, working_set;
/*************************************************************/
/* Create an AF_INET stream socket to receive incoming */
/* connections on */
/*************************************************************/
listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sd < 0)
{
perror("socket() failed");
exit(-1);
}
/*************************************************************/
/* Allow socket descriptor to be reuseable */
/*************************************************************/
rc = setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
if (rc < 0)
{
perror("setsockopt() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Set socket to be nonblocking. All of the sockets for */
/* the incoming connections will also be nonblocking since */
/* they will inherit that state from the listening socket. */
/*************************************************************/
rc = ioctl(listen_sd, FIONBIO, (char *)&on);
if (rc < 0)
{
perror("ioctl() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Bind the socket */
/*************************************************************/
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(SERVER_PORT);
rc = bind(listen_sd,
(struct sockaddr *)&addr, sizeof(addr));
if (rc < 0)
{
perror("bind() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Set the listen back log */
/*************************************************************/
rc = listen(listen_sd, 32);
if (rc < 0)
{
perror("listen() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Initialize the master fd_set */
/*************************************************************/
FD_ZERO(&master_set);
max_sd = listen_sd;
FD_SET(listen_sd, &master_set);
/*************************************************************/
/* Initialize the timeval struct to 3 minutes. If no */
/* activity after 3 minutes this program will end. */
/*************************************************************/
timeout.tv_sec = 3 * 60;
timeout.tv_usec = 0;
/*************************************************************/
/* Loop waiting for incoming connects or for incoming data */
/* on any of the connected sockets. */
/*************************************************************/
do
{
/**********************************************************/
/* Copy the master fd_set over to the working fd_set. */
/**********************************************************/
memcpy(&working_set, &master_set, sizeof(master_set));
/**********************************************************/
/* Call select() and wait 5 minutes for it to complete. */
/**********************************************************/
printf("Waiting on select().../n");
rc = select(max_sd + 1, &working_set, NULL, NULL, &timeout);
/**********************************************************/
/* Check to see if the select call failed. */
/**********************************************************/
if (rc < 0)
{
perror(" select() failed");
break;
}
/**********************************************************/
/* Check to see if the 5 minute time out expired. */
/**********************************************************/
if (rc == 0)
{
printf(" select() timed out. End program./n");
break;
}
/**********************************************************/
/* One or more descriptors are readable. Need to */
/* determine which ones they are. */
/**********************************************************/
desc_ready = rc;
for (i=0; i <= max_sd && desc_ready > 0; ++i)
{
/*******************************************************/
/* Check to see if this descriptor is ready */
/*******************************************************/
if (FD_ISSET(i, &working_set))
{
/****************************************************/
/* A descriptor was found that was readable - one */
/* less has to be looked for. This is being done */
/* so that we can stop looking at the working set */
/* once we have found all of the descriptors that */
/* were ready. */
/****************************************************/
desc_ready -= 1;
/****************************************************/
/* Check to see if this is the listening socket */
/****************************************************/
if (i == listen_sd)
{
printf(" Listening socket is readable/n");
/*************************************************/
/* Accept all incoming connections that are */
/* queued up on the listening socket before we */
/* loop back and call select again. */
/*************************************************/
do
{
/**********************************************/
/* Accept each incoming connection. If */
/* accept fails with EWOULDBLOCK, then we */
/* have accepted all of them. Any other */
/* failure on accept will cause us to end the */
/* server. */
/**********************************************/
new_sd = accept(listen_sd, NULL, NULL);
if (new_sd < 0)
{
if (errno != EWOULDBLOCK)
{
perror(" accept() failed");
end_server = TRUE;
}
break;
}
/**********************************************/
/* Add the new incoming connection to the */
/* master read set */
/**********************************************/
printf(" New incoming connection - %d/n", new_sd);
FD_SET(new_sd, &master_set);
if (new_sd > max_sd)
max_sd = new_sd;
/**********************************************/
/* Loop back up and accept another incoming */
/* connection */
/**********************************************/
} while (new_sd != -1);
}
/****************************************************/
/* This is not the listening socket, therefore an */
/* existing connection must be readable */
/****************************************************/
else
{
printf(" Descriptor %d is readable/n", i);
close_conn = FALSE;
/*************************************************/
/* Receive all incoming data on this socket */
/* before we loop back and call select again. */
/*************************************************/
do
{
/**********************************************/
/* Receive data on this connection until the */
/* recv fails with EWOULDBLOCK. If any other */
/* failure occurs, we will close the */
/* connection. */
/**********************************************/
rc = recv(i, buffer, sizeof(buffer), 0);
if (rc < 0)
{
if (errno != EWOULDBLOCK)
{
perror(" recv() failed");
close_conn = TRUE;
}
break;
}
/**********************************************/
/* Check to see if the connection has been */
/* closed by the client */
/**********************************************/
if (rc == 0)
{
printf(" Connection closed/n");
close_conn = TRUE;
break;
}
/**********************************************/
/* Data was received */
/**********************************************/
len = rc;
printf(" %d bytes received/n", len);
/**********************************************/
/* Echo the data back to the client */
/**********************************************/
rc = send(i, buffer, len, 0);
if (rc < 0)
{
perror(" send() failed");
close_conn = TRUE;
break;
}
// addby nemo
break;
} while (TRUE);
/*************************************************/
/* If the close_conn flag was turned on, we need */
/* to clean up this active connection. This */
/* clean up process includes removing the */
/* descriptor from the master set and */
/* determining the new maximum descriptor value */
/* based on the bits that are still turned on in */
/* the master set. */
/*************************************************/
if (close_conn)
{
close(i);
FD_CLR(i, &master_set);
if (i == max_sd)
{
while (FD_ISSET(max_sd, &master_set) == FALSE)
max_sd -= 1;
}
}
} /* End of existing connection is readable */
} /* End of if (FD_ISSET(i, &working_set)) */
} /* End of loop through selectable descriptors */
} while (end_server == FALSE);
/*************************************************************/
/* Clean up all of the sockets that are open */
/*************************************************************/
for (i=0; i <= max_sd; ++i)
{
if (FD_ISSET(i, &master_set))
close(i);
}
}

用poll改写的上面select的例子:
代码同样来自IBM:Using poll() instead of select()
view plaincopy to clipboardprint?01.#include <stdio.h> 02.#include <stdlib.h> 03.#include <sys/ioctl.h> 04.#include <sys/poll.h> 05.#include <sys/socket.h> 06.#include <sys/time.h> 07.#include <netinet/in.h> 08.#include <errno.h> 09.#include <unistd.h> 10.#include <string.h> 11.#define SERVER_PORT 9877 12.//#define SERVER_PORT 12345 13.#define TRUE 1 14.#define FALSE 0 15.main (int argc, char *argv[]) 16.{ 17. int len, rc, on = 1; 18. int listen_sd = -1, new_sd = -1; 19. int desc_ready, end_server = FALSE, compress_array = FALSE; 20. int close_conn; 21. char buffer[80]; 22. struct sockaddr_in addr; 23. int timeout; 24. struct pollfd fds[200]; 25. int nfds = 1, current_size = 0, i, j; 26. /*************************************************************/ 27. /* Create an AF_INET stream socket to receive incoming */ 28. /* connections on */ 29. /*************************************************************/ 30. listen_sd = socket(AF_INET, SOCK_STREAM, 0); 31. if (listen_sd < 0) 32. { 33. perror("socket() failed"); 34. exit(-1); 35. } 36. /*************************************************************/ 37. /* Allow socket descriptor to be reuseable */ 38. /*************************************************************/ 39. rc = setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, 40. (char *)&on, sizeof(on)); 41. if (rc < 0) 42. { 43. perror("setsockopt() failed"); 44. close(listen_sd); 45. exit(-1); 46. } 47. /*************************************************************/ 48. /* Set socket to be nonblocking. All of the sockets for */ 49. /* the incoming connections will also be nonblocking since */ 50. /* they will inherit that state from the listening socket. */ 51. /*************************************************************/ 52. rc = ioctl(listen_sd, FIONBIO, (char *)&on); 53. if (rc < 0) 54. { 55. perror("ioctl() failed"); 56. close(listen_sd); 57. exit(-1); 58. } 59. /*************************************************************/ 60. /* Bind the socket */ 61. /*************************************************************/ 62. memset(&addr, 0, sizeof(addr)); 63. addr.sin_family = AF_INET; 64. addr.sin_addr.s_addr = htonl(INADDR_ANY); 65. addr.sin_port = htons(SERVER_PORT); 66. rc = bind(listen_sd, 67. (struct sockaddr *)&addr, sizeof(addr)); 68. if (rc < 0) 69. { 70. perror("bind() failed"); 71. close(listen_sd); 72. exit(-1); 73. } 74. /*************************************************************/ 75. /* Set the listen back log */ 76. /*************************************************************/ 77. rc = listen(listen_sd, 32); 78. if (rc < 0) 79. { 80. perror("listen() failed"); 81. close(listen_sd); 82. exit(-1); 83. } 84. /*************************************************************/ 85. /* Initialize the pollfd structure */ 86. /*************************************************************/ 87. memset(fds, 0 , sizeof(fds)); 88. /*************************************************************/ 89. /* Set up the initial listening socket */ 90. /*************************************************************/ 91. fds[0].fd = listen_sd; 92. fds[0].events = POLLIN; 93. /*************************************************************/ 94. /* Initialize the timeout to 3 minutes. If no */ 95. /* activity after 3 minutes this program will end. */ 96. /* timeout value is based on milliseconds. */ 97. /*************************************************************/ 98. timeout = (3 * 60 * 1000); 99. /*************************************************************/ 100. /* Loop waiting for incoming connects or for incoming data */ 101. /* on any of the connected sockets. */ 102. /*************************************************************/ 103. do 104. { 105. /***********************************************************/ 106. /* Call poll() and wait 3 minutes for it to complete. */ 107. /***********************************************************/ 108. printf("Waiting on poll().../n"); 109. rc = poll(fds, nfds, timeout); 110. /***********************************************************/ 111. /* Check to see if the poll call failed. */ 112. /***********************************************************/ 113. if (rc < 0) 114. { 115. perror(" poll() failed"); 116. break; 117. } 118. /***********************************************************/ 119. /* Check to see if the 3 minute time out expired. */ 120. /***********************************************************/ 121. if (rc == 0) 122. { 123. printf(" poll() timed out. End program./n"); 124. break; 125. } 126. /***********************************************************/ 127. /* One or more descriptors are readable. Need to */ 128. /* determine which ones they are. */ 129. /***********************************************************/ 130. current_size = nfds; 131. for (i = 0; i < current_size; i++) 132. { 133. /*********************************************************/ 134. /* Loop through to find the descriptors that returned */ 135. /* POLLIN and determine whether it's the listening */ 136. /* or the active connection. */ 137. /*********************************************************/ 138. if(fds[i].revents == 0) 139. continue; 140. /*********************************************************/ 141. /* If revents is not POLLIN, it's an unexpected result, */ 142. /* log and end the server. */ 143. /*********************************************************/ 144. if(fds[i].revents != POLLIN) 145. { 146. printf(" Error! revents = %d/n", fds[i].revents); 147. end_server = TRUE; 148. break; 149. } 150. if (fds[i].fd == listen_sd) 151. { 152. /*******************************************************/ 153. /* Listening descriptor is readable. */ 154. /*******************************************************/ 155. printf(" Listening socket is readable/n"); 156. /*******************************************************/ 157. /* Accept all incoming connections that are */ 158. /* queued up on the listening socket before we */ 159. /* loop back and call poll again. */ 160. /*******************************************************/ 161. do 162. { 163. /*****************************************************/ 164. /* Accept each incoming connection. If */ 165. /* accept fails with EWOULDBLOCK, then we */ 166. /* have accepted all of them. Any other */ 167. /* failure on accept will cause us to end the */ 168. /* server. */ 169. /*****************************************************/ 170. new_sd = accept(listen_sd, NULL, NULL); 171. if (new_sd < 0) 172. { 173. if (errno != EWOULDBLOCK) 174. { 175. perror(" accept() failed"); 176. end_server = TRUE; 177. } 178. break; 179. } 180. /*****************************************************/ 181. /* Add the new incoming connection to the */ 182. /* pollfd structure */ 183. /*****************************************************/ 184. printf(" New incoming connection - %d/n", new_sd); 185. fds[nfds].fd = new_sd; 186. fds[nfds].events = POLLIN; 187. nfds++; 188. /*****************************************************/ 189. /* Loop back up and accept another incoming */ 190. /* connection */ 191. /*****************************************************/ 192. } while (new_sd != -1); 193. } 194. /*********************************************************/ 195. /* This is not the listening socket, therefore an */ 196. /* existing connection must be readable */ 197. /*********************************************************/ 198. else 199. { 200. printf(" Descriptor %d is readable/n", fds[i].fd); 201. close_conn = FALSE; 202. /*******************************************************/ 203. /* Receive all incoming data on this socket */ 204. /* before we loop back and call poll again. */ 205. /*******************************************************/ 206. do 207. { 208. /*****************************************************/ 209. /* Receive data on this connection until the */ 210. /* recv fails with EWOULDBLOCK. If any other */ 211. /* failure occurs, we will close the */ 212. /* connection. */ 213. /*****************************************************/ 214. rc = recv(fds[i].fd, buffer, sizeof(buffer), 0); 215. if (rc < 0) 216. { 217. if (errno != EWOULDBLOCK) 218. { 219. perror(" recv() failed"); 220. close_conn = TRUE; 221. } 222. break; 223. } 224. /*****************************************************/ 225. /* Check to see if the connection has been */ 226. /* closed by the client */ 227. /*****************************************************/ 228. if (rc == 0) 229. { 230. printf(" Connection closed/n"); 231. close_conn = TRUE; 232. break; 233. } 234. /*****************************************************/ 235. /* Data was received */ 236. /*****************************************************/ 237. len = rc; 238. printf(" %d bytes received/n", len); 239. /*****************************************************/ 240. /* Echo the data back to the client */ 241. /*****************************************************/ 242. rc = send(fds[i].fd, buffer, len, 0); 243. if (rc < 0) 244. { 245. perror(" send() failed"); 246. close_conn = TRUE; 247. break; 248. } 249. break; 250. } while(TRUE); 251. /*******************************************************/ 252. /* If the close_conn flag was turned on, we need */ 253. /* to clean up this active connection. This */ 254. /* clean up process includes removing the */ 255. /* descriptor. */ 256. /*******************************************************/ 257. if (close_conn) 258. { 259. close(fds[i].fd); 260. fds[i].fd = -1; 261. compress_array = TRUE; 262. } 263. } /* End of existing connection is readable */ 264. } /* End of loop through pollable descriptors */ 265. /***********************************************************/ 266. /* If the compress_array flag was turned on, we need */ 267. /* to squeeze together the array and decrement the number */ 268. /* of file descriptors. We do not need to move back the */ 269. /* events and revents fields because the events will always*/ 270. /* be POLLIN in this case, and revents is output. */ 271. /***********************************************************/ 272. if (compress_array) 273. { 274. compress_array = FALSE; 275. for (i = 0; i < nfds; i++) 276. { 277. if (fds[i].fd == -1) 278. { 279. for(j = i; j < nfds; j++) 280. { 281. fds[j].fd = fds[j+1].fd; 282. } 283. nfds--; 284. } 285. } 286. } 287. } while (end_server == FALSE); /* End of serving running. */ 288. /*************************************************************/ 289. /* Clean up all of the sockets that are open */ 290. /*************************************************************/ 291. for (i = 0; i < nfds; i++) 292. { 293. if(fds[i].fd >= 0) 294. close(fds[i].fd); 295. } 296.} #include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#define SERVER_PORT 9877
//#define SERVER_PORT 12345
#define TRUE 1
#define FALSE 0
main (int argc, char *argv[])
{
int len, rc, on = 1;
int listen_sd = -1, new_sd = -1;
int desc_ready, end_server = FALSE, compress_array = FALSE;
int close_conn;
char buffer[80];
struct sockaddr_in addr;
int timeout;
struct pollfd fds[200];
int nfds = 1, current_size = 0, i, j;
/*************************************************************/
/* Create an AF_INET stream socket to receive incoming */
/* connections on */
/*************************************************************/
listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sd < 0)
{
perror("socket() failed");
exit(-1);
}
/*************************************************************/
/* Allow socket descriptor to be reuseable */
/*************************************************************/
rc = setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
if (rc < 0)
{
perror("setsockopt() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Set socket to be nonblocking. All of the sockets for */
/* the incoming connections will also be nonblocking since */
/* they will inherit that state from the listening socket. */
/*************************************************************/
rc = ioctl(listen_sd, FIONBIO, (char *)&on);
if (rc < 0)
{
perror("ioctl() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Bind the socket */
/*************************************************************/
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(SERVER_PORT);
rc = bind(listen_sd,
(struct sockaddr *)&addr, sizeof(addr));
if (rc < 0)
{
perror("bind() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Set the listen back log */
/*************************************************************/
rc = listen(listen_sd, 32);
if (rc < 0)
{
perror("listen() failed");
close(listen_sd);
exit(-1);
}
/*************************************************************/
/* Initialize the pollfd structure */
/*************************************************************/
memset(fds, 0 , sizeof(fds));
/*************************************************************/
/* Set up the initial listening socket */
/*************************************************************/
fds[0].fd = listen_sd;
fds[0].events = POLLIN;
/*************************************************************/
/* Initialize the timeout to 3 minutes. If no */
/* activity after 3 minutes this program will end. */
/* timeout value is based on milliseconds. */
/*************************************************************/
timeout = (3 * 60 * 1000);
/*************************************************************/
/* Loop waiting for incoming connects or for incoming data */
/* on any of the connected sockets. */
/*************************************************************/
do
{
/***********************************************************/
/* Call poll() and wait 3 minutes for it to complete. */
/***********************************************************/
printf("Waiting on poll().../n");
rc = poll(fds, nfds, timeout);
/***********************************************************/
/* Check to see if the poll call failed. */
/***********************************************************/
if (rc < 0)
{
perror(" poll() failed");
break;
}
/***********************************************************/
/* Check to see if the 3 minute time out expired. */
/***********************************************************/
if (rc == 0)
{
printf(" poll() timed out. End program./n");
break;
}
/***********************************************************/
/* One or more descriptors are readable. Need to */
/* determine which ones they are. */
/***********************************************************/
current_size = nfds;
for (i = 0; i < current_size; i++)
{
/*********************************************************/
/* Loop through to find the descriptors that returned */
/* POLLIN and determine whether it's the listening */
/* or the active connection. */
/*********************************************************/
if(fds[i].revents == 0)
continue;
/*********************************************************/
/* If revents is not POLLIN, it's an unexpected result, */
/* log and end the server. */
/*********************************************************/
if(fds[i].revents != POLLIN)
{
printf(" Error! revents = %d/n", fds[i].revents);
end_server = TRUE;
break;
}
if (fds[i].fd == listen_sd)
{
/*******************************************************/
/* Listening descriptor is readable. */
/*******************************************************/
printf(" Listening socket is readable/n");
/*******************************************************/
/* Accept all incoming connections that are */
/* queued up on the listening socket before we */
/* loop back and call poll again. */
/*******************************************************/
do
{
/*****************************************************/
/* Accept each incoming connection. If */
/* accept fails with EWOULDBLOCK, then we */
/* have accepted all of them. Any other */
/* failure on accept will cause us to end the */
/* server. */
/*****************************************************/
new_sd = accept(listen_sd, NULL, NULL);
if (new_sd < 0)
{
if (errno != EWOULDBLOCK)
{
perror(" accept() failed");
end_server = TRUE;
}
break;
}
/*****************************************************/
/* Add the new incoming connection to the */
/* pollfd structure */
/*****************************************************/
printf(" New incoming connection - %d/n", new_sd);
fds[nfds].fd = new_sd;
fds[nfds].events = POLLIN;
nfds++;
/*****************************************************/
/* Loop back up and accept another incoming */
/* connection */
/*****************************************************/
} while (new_sd != -1);
}
/*********************************************************/
/* This is not the listening socket, therefore an */
/* existing connection must be readable */
/*********************************************************/
else
{
printf(" Descriptor %d is readable/n", fds[i].fd);
close_conn = FALSE;
/*******************************************************/
/* Receive all incoming data on this socket */
/* before we loop back and call poll again. */
/*******************************************************/
do
{
/*****************************************************/
/* Receive data on this connection until the */
/* recv fails with EWOULDBLOCK. If any other */
/* failure occurs, we will close the */
/* connection. */
/*****************************************************/
rc = recv(fds[i].fd, buffer, sizeof(buffer), 0);
if (rc < 0)
{
if (errno != EWOULDBLOCK)
{
perror(" recv() failed");
close_conn = TRUE;
}
break;
}
/*****************************************************/
/* Check to see if the connection has been */
/* closed by the client */
/*****************************************************/
if (rc == 0)
{
printf(" Connection closed/n");
close_conn = TRUE;
break;
}
/*****************************************************/
/* Data was received */
/*****************************************************/
len = rc;
printf(" %d bytes received/n", len);
/*****************************************************/
/* Echo the data back to the client */
/*****************************************************/
rc = send(fds[i].fd, buffer, len, 0);
if (rc < 0)
{
perror(" send() failed");
close_conn = TRUE;
break;
}
break;
} while(TRUE);
/*******************************************************/
/* If the close_conn flag was turned on, we need */
/* to clean up this active connection. This */
/* clean up process includes removing the */
/* descriptor. */
/*******************************************************/
if (close_conn)
{
close(fds[i].fd);
fds[i].fd = -1;
compress_array = TRUE;
}
} /* End of existing connection is readable */
} /* End of loop through pollable descriptors */
/***********************************************************/
/* If the compress_array flag was turned on, we need */
/* to squeeze together the array and decrement the number */
/* of file descriptors. We do not need to move back the */
/* events and revents fields because the events will always*/
/* be POLLIN in this case, and revents is output. */
/***********************************************************/
if (compress_array)
{
compress_array = FALSE;
for (i = 0; i < nfds; i++)
{
if (fds[i].fd == -1)
{
for(j = i; j < nfds; j++)
{
fds[j].fd = fds[j+1].fd;
}
nfds--;
}
}
}
} while (end_server == FALSE); /* End of serving running. */
/*************************************************************/
/* Clean up all of the sockets that are open */
/*************************************************************/
for (i = 0; i < nfds; i++)
{
if(fds[i].fd >= 0)
close(fds[i].fd);
}
}

再来一个epoll的例子:
这个转自vimer的blog(略加修改): http://www.vimer.cn/2009/11/epoll%e4%bd%bf%e7%94%a8%e5%ae%9e%e4%be%8b%e8%af%b4%e6%98%8e.html

view plaincopy to clipboardprint?
01.#include <stdio.h>
02.#include <stdlib.h>
03.#include <errno.h>
04.#include <string.h>
05.#include <sys/types.h>
06.#include <netinet/in.h>
07.#include <sys/socket.h>
08.#include <sys/wait.h>
09.#include <unistd.h>
10.#include <arpa/inet.h>
11.#include <openssl/ssl.h>
12.#include <openssl/err.h>
13.#include <fcntl.h>
14.#include <sys/epoll.h>
15.#include <sys/time.h>
16.#include <sys/resource.h>
17.#define MAXBUF 1024
18.#define MAXEPOLLSIZE 10000
19.#define ECHO_PORT 9877
20./*
21. setnonblocking - 设置句柄为非阻塞方式
22. */
23.int setnonblocking(int sockfd)
24.{
25. if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)
26. {
27. return -1;
28. }
29. return 0;
30.}
31./*
32. handle_message - 处理每个 socket 上的消息收发
33. */
34.int handle_message(int new_fd)
35.{
36. char buf[MAXBUF + 1];
37. int len;
38. int rc;
39. /* 开始处理每个新连接上的数据收发 */
40. bzero(buf, MAXBUF + 1);
41. /* 接收客户端的消息 */
42. len = recv(new_fd, buf, MAXBUF, 0);
43. if (len > 0)
44. {
45. printf("%d接收消息成功:'%s',共%d个字节的数据/n",
46. new_fd, buf, len);
47. rc = send(new_fd, buf, len, 0);
48. if (rc<0)
49. {
50. perror("send() failed.");
51. close(new_fd);
52. return -1;
53. }
54. }
55. else
56. {
57. if (len < 0)
58. printf
59. ("消息接收失败!错误代码是%d,错误信息是'%s'/n",
60. errno, strerror(errno));
61. close(new_fd);
62. return -1;
63. }
64. /* 处理每个新连接上的数据收发结束 */
65. return len;
66.}
67.int main(int argc, char **argv)
68.{
69. int listener, new_fd, kdpfd, nfds, n, ret, curfds;
70. socklen_t len;
71. struct sockaddr_in my_addr, their_addr;
72. unsigned int myport, lisnum;
73. struct epoll_event ev;
74. struct epoll_event events[MAXEPOLLSIZE];
75. struct rlimit rt;
76. myport = ECHO_PORT;
77. lisnum = 2;
78. /* 设置每个进程允许打开的最大文件数 */
79. rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
80. if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
81. {
82. perror("setrlimit");
83. exit(1);
84. }
85. else
86. {
87. printf("设置系统资源参数成功!/n");
88. }
89. /* 开启 socket 监听 */
90. if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1)
91. {
92. perror("socket");
93. exit(1);
94. }
95. else
96. {
97. printf("socket 创建成功!/n");
98. }
99. setnonblocking(listener);
100. bzero(&my_addr, sizeof(my_addr));
101. my_addr.sin_family = PF_INET;
102. my_addr.sin_port = htons(myport);
103. my_addr.sin_addr.s_addr = INADDR_ANY;
104. if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
105. {
106. perror("bind");
107. exit(1);
108. }
109. else
110. {
111. printf("IP 地址和端口绑定成功/n");
112. }
113. if (listen(listener, lisnum) == -1)
114. {
115. perror("listen");
116. exit(1);
117. }
118. else
119. {
120. printf("开启服务成功!/n");
121. }
122. /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */
123. kdpfd = epoll_create(MAXEPOLLSIZE);
124. len = sizeof(struct sockaddr_in);
125. ev.events = EPOLLIN | EPOLLET;
126. ev.data.fd = listener;
127. if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0)
128. {
129. fprintf(stderr, "epoll set insertion error: fd=%d/n", listener);
130. return -1;
131. }
132. else
133. {
134. printf("监听 socket 加入 epoll 成功!/n");
135. }
136. curfds = 1;
137. while (1)
138. {
139. /* 等待有事件发生 */
140. nfds = epoll_wait(kdpfd, events, curfds, -1);
141. if (nfds == -1)
142. {
143. perror("epoll_wait");
144. break;
145. }
146. /* 处理所有事件 */
147. for (n = 0; n < nfds; ++n)
148. {
149. if (events[n].data.fd == listener)
150. {
151. new_fd = accept(listener, (struct sockaddr *) &their_addr,&len);
152. if (new_fd < 0)
153. {
154. perror("accept");
155. continue;
156. }
157. else
158. {
159. printf("有连接来自于: %d:%d, 分配的 socket 为:%d/n",
160. inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd);
161. }
162. setnonblocking(new_fd);
163. ev.events = EPOLLIN | EPOLLET;
164. ev.data.fd = new_fd;
165. if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_fd, &ev) < 0)
166. {
167. fprintf(stderr, "把 socket '%d' 加入 epoll 失败!%s/n",
168. new_fd, strerror(errno));
169. return -1;
170. }
171. curfds++;
172. }
173. else
174. {
175. ret = handle_message(events[n].data.fd);
176. if (ret < 1 && errno != 11)
177. {
178. epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
179. curfds--;
180. }
181. }
182. }
183. }
184. close(listener);
185. return 0;
186.}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics