Skip to content

Commit 127372b

Browse files
committed
It Works Better with Thread Signaling
Signals are gone. I spawn a thread for notification of job done. Then push the callback to python PendingCalls, I attempt to queue the callback unlimited times, sleeping longer each time. I have a hack in gevent.py to wake up the event loop if only one event is running. This works well on a gevent script with 32 workers all doing IO
1 parent 601f821 commit 127372b

5 files changed

Lines changed: 135 additions & 128 deletions

File tree

‎README.markdown‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Python Asynchronous I/O bindings (aio.h)
44
Version 0.3
55
**Linux only**
66

7+
You should wait for the callback to finish before queuing more requests in
8+
a tight loop. Or pyaio could hang against python pending queue length
9+
10+
711
Reading
812
-------
913

‎pyaio/core.c‎

Lines changed: 45 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <string.h>
66
#include <fcntl.h>
77
#include <errno.h>
8+
#include <unistd.h>
89

910
typedef struct py_aio_callback {
1011
struct aiocb *cb;
@@ -18,93 +19,61 @@ PyDoc_STRVAR(pyaio_read_doc,
1819
PyDoc_STRVAR(pyaio_write_doc,
1920
"aio_write(fileno, buffer, offset, callback)\n");
2021

21-
static void aio_read_completion_handler(int sig, siginfo_t *info, void *context)
22+
static int _async_callback(void *arg)
2223
{
23-
Pyaio_cb *aio;
24+
Pyaio_cb *aio = (Pyaio_cb *)arg;
2425
struct aiocb *cb;
25-
PyObject *callback, *args;
26+
PyObject *callback, *args, *result, *buffer;
2627
Py_ssize_t read_size = 0;
2728
Py_buffer pbuf;
2829

29-
PyGILState_STATE gstate;
30-
gstate = PyGILState_Ensure();
31-
32-
aio = (Pyaio_cb*) info->si_value.sival_ptr;
33-
3430
cb = aio->cb;
3531
callback = aio->callback;
32+
buffer = aio->buffer;
3633

37-
/* Lets only let python know about how much was actually read */
38-
if (aio_return(cb) > 0) {
39-
read_size = aio_return(cb);
34+
if (buffer == NULL) {
35+
if (aio_return(cb) > 0) {
36+
read_size = aio_return(cb);
37+
}
38+
/* Create a return buffer */
39+
PyBuffer_FillInfo(&pbuf, 0, (void *)cb->aio_buf, read_size, 0,
40+
PyBUF_CONTIG);
41+
args = Py_BuildValue("(Nni)", PyMemoryView_FromBuffer(&pbuf),
42+
aio_return(cb), aio_error(cb));
43+
}
44+
else { /* WRITE */
45+
args = Py_BuildValue("(ni)", aio_return(cb), aio_error(cb));
4046
}
41-
42-
/* Create a buffer to return the data to python */
43-
PyBuffer_FillInfo(&pbuf, 0, (void *)cb->aio_buf, read_size, 0, PyBUF_CONTIG);
44-
45-
args = Py_BuildValue("(Nni)", PyMemoryView_FromBuffer(&pbuf),
46-
aio_return(cb), aio_error(cb));
47-
4847
Py_XINCREF(args);
49-
50-
//TODO add checking for null, kill program
51-
PyObject_CallObject(callback, args);
52-
48+
result = PyObject_CallObject(callback, args);
49+
if (result == NULL) {
50+
printf("Exception in aio callback, dying!\n");
51+
kill(getpid(), SIGKILL); // DIE FAST
52+
}
53+
Py_XDECREF(result);
5354
Py_XDECREF(callback);
5455
Py_XDECREF(args);
55-
56+
if (buffer != NULL) {
57+
Py_XDECREF(buffer);
58+
}
5659
free((struct aiocb *)cb);
5760
free(aio);
58-
59-
PyGILState_Release(gstate);
61+
return 0;
6062
}
6163

62-
static void aio_write_completion_handler(int sig, siginfo_t *info, void *context)
64+
static void aio_completion_handler(sigval_t sigval)
6365
{
6466
Pyaio_cb *aio;
65-
struct aiocb *cb;
66-
PyObject *callback, *args;
67-
68-
PyGILState_STATE gstate;
69-
gstate = PyGILState_Ensure();
70-
71-
aio = (Pyaio_cb*) info->si_value.sival_ptr;
72-
cb = aio->cb;
73-
callback = aio->callback;
74-
args = Py_BuildValue("(ni)", aio_return(cb), aio_error(cb));
75-
76-
Py_XDECREF(aio->buffer);
77-
Py_XINCREF(args);
78-
79-
//TODO add checking for return null, kill program
80-
PyObject_CallObject(callback, args);
81-
82-
Py_XDECREF(callback);
83-
Py_XDECREF(args);
67+
int tries = 1;
68+
aio = (Pyaio_cb*) sigval.sival_ptr;
8469

85-
free((struct aiocb *)cb);
86-
free(aio);
87-
88-
PyGILState_Release(gstate);
89-
}
70+
//We should set an upper limit like 50 retries or something
71+
while(Py_AddPendingCall(&_async_callback, aio) < 0) {
72+
usleep(500*(tries/2)); //Step off timer
73+
tries += 1;
74+
}
9075

91-
static void init_sig_handlers(void)
92-
{
93-
struct sigaction *sa;
94-
95-
/* Install Read Handler */
96-
sa = malloc(sizeof(struct sigaction));
97-
sigemptyset(&sa->sa_mask);
98-
sa->sa_sigaction = aio_read_completion_handler;
99-
sa->sa_flags = SA_SIGINFO | SA_RESTART;
100-
sigaction(SIGRTMIN+1, sa, NULL);
101-
102-
/* Install Write Handler */
103-
sa = malloc(sizeof(struct sigaction));
104-
sigemptyset(&sa->sa_mask);
105-
sa->sa_sigaction = aio_write_completion_handler;
106-
sa->sa_flags = SA_SIGINFO | SA_RESTART;
107-
sigaction(SIGRTMIN+2, sa, NULL);
76+
return;
10877
}
10978

11079
static PyObject *
@@ -115,7 +84,7 @@ pyaio_read(PyObject *dummy, PyObject *args) {
11584

11685
Pyaio_cb *aio;
11786
PyObject *callback, *return_;
118-
87+
Py_XINCREF(args);
11988
if (PyArg_ParseTuple(args, "innO:set_callback", &fd, &offset, &numbytes,
12089
&callback)) {
12190
if (!PyCallable_Check(callback)) {
@@ -125,21 +94,22 @@ pyaio_read(PyObject *dummy, PyObject *args) {
12594
}
12695
Py_XINCREF(callback); /* Add a reference to new callback */
12796
}
128-
97+
Py_XDECREF(args);
12998
aio = malloc(sizeof(Pyaio_cb));
13099

131100
aio->cb = malloc(sizeof(struct aiocb));
132101
bzero((char *) aio->cb, sizeof(struct aiocb));
133102

134103
aio->callback = callback;
104+
aio->buffer = NULL;
135105

136106
aio->cb->aio_buf = malloc((numbytes) * sizeof(char));
137107
aio->cb->aio_fildes = fd;
138108
aio->cb->aio_nbytes = numbytes;
139109
aio->cb->aio_offset = offset;
140-
aio->cb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
141-
aio->cb->aio_sigevent.sigev_signo = SIGRTMIN+1;
110+
aio->cb->aio_sigevent.sigev_notify = SIGEV_THREAD; /* EvIL */
142111
aio->cb->aio_sigevent.sigev_notify_attributes = NULL;
112+
aio->cb->aio_sigevent.sigev_notify_function = aio_completion_handler;
143113
aio->cb->aio_sigevent.sigev_value.sival_ptr = aio;
144114

145115
ret = aio_read(aio->cb);
@@ -165,7 +135,7 @@ pyaio_write(PyObject *dummy, PyObject *args) {
165135

166136
Pyaio_cb *aio;
167137
PyObject *callback, *return_;
168-
138+
Py_XINCREF(args);
169139
if (PyArg_ParseTuple(args, "iOnO:set_callback", &fd, &buffer,
170140
&offset, &callback)) {
171141
if (!PyCallable_Check(callback)) {
@@ -180,7 +150,7 @@ pyaio_write(PyObject *dummy, PyObject *args) {
180150
}
181151
Py_XINCREF(callback); /* Add a reference to new callback */
182152
}
183-
153+
Py_XDECREF(args);
184154
/* Get a Memoryview */
185155
if (!PyMemoryView_Check(buffer)) {
186156
buffer = PyMemoryView_GetContiguous(buffer, PyBUF_READ, 'C');
@@ -199,9 +169,9 @@ pyaio_write(PyObject *dummy, PyObject *args) {
199169
aio->cb->aio_fildes = fd;
200170
aio->cb->aio_nbytes = buffer_view->len;
201171
aio->cb->aio_offset = offset;
202-
aio->cb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
203-
aio->cb->aio_sigevent.sigev_signo = SIGRTMIN+2;
172+
aio->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
204173
aio->cb->aio_sigevent.sigev_notify_attributes = NULL;
174+
aio->cb->aio_sigevent.sigev_notify_function = aio_completion_handler;
205175
aio->cb->aio_sigevent.sigev_value.sival_ptr = aio;
206176

207177
ret = aio_write(aio->cb);
@@ -273,8 +243,6 @@ init_pyaio(void) {
273243
return NULL;
274244
}
275245

276-
init_sig_handlers();
277-
278246
return m;
279247
}
280248

‎pyaio/gevent.py‎

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
from __future__ import absolute_import
22
import os
33
import pyaio
4+
import gevent
45
from gevent.event import AsyncResult
6+
from gevent.coros import RLock
57

68
class aioFile(object):
79
"""a buffered File like object that uses pyaio and gevent"""
810
def __init__(self, filename, mode='r', buffer=16<<10):
911
modes = os.O_LARGEFILE | os.O_CREAT
1012
self._offset = 0
1113
self._buffer_size = buffer
14+
if buffer:
15+
self._buffer_lock = RLock()
1216
self._read = False
1317
self._write = False
1418
self._read_buf = None
1519
self._write_buf = None
1620
self._eof = False # Optimization to limit calls
21+
self._append = False # Append Mode writes ignore offset
1722

1823
if mode.startswith('r') or '+' in mode:
1924
self._read = True
@@ -30,9 +35,10 @@ def __init__(self, filename, mode='r', buffer=16<<10):
3035
modes |= os.O_WRONLY
3136
if '+' in mode:
3237
modes |= os.O_RDWR
33-
self._fd = os.open(filename, modes)
3438
if mode.startswith('a'):
35-
self.seek(0, os.SEEK_END) # Append so goto end
39+
modes |= os.O_APPEND
40+
self._append = True
41+
self._fd = os.open(filename, modes)
3642

3743
def _clear_read_buf(self):
3844
if self._read:
@@ -70,7 +76,8 @@ def seek(self, pos, how=os.SEEK_SET):
7076
raise OSError(14, 'File Position invalid, less than 0')
7177
#Even if the pos didn't change fix the buffers and EOF
7278
self._clear_read_buf()
73-
self.flush()
79+
if not self._append: # DON'T FLUSH on seek with append
80+
self.flush()
7481
self._offset = offset
7582
return offset
7683

@@ -95,30 +102,51 @@ def write(self, buf, offset=None):
95102
"""write a buffer object to file"""
96103
if not self._write:
97104
raise IOError(9, 'Bad file descriptor')
98-
if self._buffer_size and self._read_buf: # We should clear read cache
105+
if not self._append and self._buffer_size and self._read_buf:
106+
# We should clear read cache
99107
self._clear_read_buf()
100108
if offset is None:
101109
offset = self._offset
102110
write_size = self._buffer_size
103111
if not self._buffer_size and buf:
104112
write_size = len(buf)
105-
if offset != self._offset:
113+
if not self._append and offset != self._offset:
106114
self.seek(offset) # Makes sure we write our buffer
107-
if buf:
108-
self._write_buf.extend(buf)
109-
while len(self._write_buf) >= self._buffer_size \
110-
or (self._flush and self._write_buf):
115+
116+
#If we buffer we use the global buffer if not we use a local buffer
117+
if self._buffer_size:
118+
lbuf = self._write_buf
119+
self._buffer_lock.acquire()
120+
if buf:
121+
# The a memoryview of the buffer
122+
lbuf.extend(buf) # pushed to pyaio so we need to lock
123+
else:
124+
lbuf = buf
125+
126+
while lbuf and len(lbuf) >= self._buffer_size \
127+
or (self._flush and lbuf):
111128
result = AsyncResult()
112129
def _write_results(rcode, errno):
113130
result.set((rcode, errno))
114-
pyaio.aio_write(self._fd, memoryview(self._write_buf)[0:write_size],
131+
pyaio.aio_write(self._fd, memoryview(lbuf)[0:write_size],
115132
offset, _write_results)
116-
rcode, errno = result.get() # gevent yield
133+
#WARNING THIS IS A DIRTY DIRTY TRICK
134+
def _no_op():
135+
pass
136+
#100 micro Seconds In the Future to be exact
137+
gevent.spawn_later(0.0001, _no_op); # WAKE THE EVENT LOOP
138+
rcode, errno = result.get() #SLEEP
139+
117140
if rcode < 0: # Some kind of error
118141
raise IOError(errno, 'AIO Write Error %d' % errno)
119142
# Clean up buffer (of actually written bytes)
120-
del self._write_buf[0:rcode]
143+
if self._buffer_size:
144+
del lbuf[0:rcode]
145+
else:
146+
lbuf = None
121147
self._offset = offset = offset + rcode # Move the file offset
148+
if self._buffer_size:
149+
self._buffer_lock.release()
122150
if buf:
123151
return len(buf)
124152
else:
@@ -129,7 +157,7 @@ def read(self, size=0, offset=None):
129157
"""for speed we assume EOF after first short read"""
130158
if not self._read:
131159
raise IOError(9, 'Bad file descriptor')
132-
if self._buffer_size and self._write_buf:
160+
if not self._append and self._buffer_size and self._write_buf:
133161
self.flush()
134162
if offset is None:
135163
offset = self._offset
@@ -138,7 +166,7 @@ def read(self, size=0, offset=None):
138166
if size == 0: # Attempt to read entire file and return in a single return
139167
return self._read_file()
140168
else:
141-
rbuf = bytearray()
169+
rbuf = bytearray() # Holding Place for multiple reads
142170
while len(rbuf) < size: # People get what they ask for
143171
# If we don't want to buffer then just read what they want
144172
if len(self._read_buf) < size - len(rbuf) and not self._eof:
@@ -150,19 +178,28 @@ def _read_results(buf, rcode, errno):
150178
if self._buffer_size: # If we buffer read buffer instead
151179
read_size = self._buffer_size
152180
pyaio.aio_read(self._fd, offset, read_size, _read_results)
153-
buf, rcode, errno = result.get() #gevent YIELD :)
181+
#WARNING THIS IS A DIRTY DIRTY TRICK
182+
def _no_op():
183+
pass
184+
#100 micro Seconds In the Future to be exact
185+
gevent.spawn_later(0.0001, _no_op); # WAKE THE EVENT LOOP
186+
buf, rcode, errno = result.get() #SLEEP
154187
if rcode < 0: # Some kind of error
155188
raise IOError(errno, 'AIO Read Error %d' % errno)
156189
#Rcode will be the bytes read so lets push the offset
157190
self._offset = offset = offset + rcode
158-
self._read_buf.extend(buf)
191+
if self._buffer_size:
192+
self._read_buf.extend(buf)
193+
else:
194+
rbuf = buf # Pass through because we are not buffering
159195
if rcode == 0 or rcode < read_size: # Good Enough
160196
self._eof = True
161197
#Do a buffer read
162198
toread = size - len(rbuf)
163-
rbuf.extend(memoryview(self._read_buf)[0:toread])
164-
#Clean up read buffer
165-
del self._read_buf[0:toread]
199+
if self._buffer_size:
200+
rbuf.extend(memoryview(self._read_buf)[0:toread])
201+
#Clean up read buffer
202+
del self._read_buf[0:toread]
166203
if not self._read_buf and self._eof: # Empty buffer and eof
167204
break
168205
if self._eof and not rbuf:

‎setup.py‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
import sys
33

44
if sys.version < '2.7':
5-
print "pyaio requires python of at least version 2.7"
6-
sys.exit(1)
5+
print "pyaio requires python of at least version 2.7 to build correctly"
76

87
version = '0.3'
98

0 commit comments

Comments
 (0)