0

The spark python api currently has limited support for loading large binary data files, and so I tried to get numpy.fromfile to help me out.

I first got a list of filenames I'd like to load, e.g.:

In [9] filenames
Out[9]: 
['A0000.dat',
 'A0001.dat',
 'A0002.dat',
 'A0003.dat',
 'A0004.dat']

I can load these files without problems with a crude iterative unionization,

for i in range(len(filenames)):
    rdd = sc.parallelize([np.fromfile(filenames[i], dtype="int16", count=-1, sep='')])
    if i==0:
        allRdd = rdd;
    else:
        allRdd = allRdd.union(rdd);

It would be great to load the files all at once, and into multiple nodes. I tried to do this as follows,

filenameRdd = sc.parallelize(filenames)
allRdd2 = filenameRdd.map(lambda x: np.fromfile(x, dtype="int16", count=-1, sep=''))

but this didn't didn't work. I get back some RDD

In [20]: allRdd2
Out[20]: PythonRDD[13] at RDD at PythonRDD.scala:43

which keeps throwing errors if I try to manipulate it.

Is my approach theoretically possible? If not, what's a good alternative?

Update: The error message suggests that the nodes cannot find the original files (below). And indeed this approach works perfectly when I copy all the files into my home directory.


Here are details of the error message.

E.g., collect() works with the first method,

allRdd.collect()

[array([87, 52, 82, ..., 96, 25, 20], dtype=int16),
 array([20, 72, 13, ..., 53, 41, 99], dtype=int16),
 array([97, 63, 17, ..., 38, 89, 13], dtype=int16),
 array([88, 66, 97, ..., 22, 93, 93], dtype=int16),
 array([99, 14, 42, ..., 33, 34, 20], dtype=int16)]

But not with the second method,

allRdd2.collect()

15/10/09 08:21:58 ERROR Executor: Exception in task 12.0 in stage 4.0 (TID 113)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/spark-current/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-current/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-6-58733c66cd22>", line 3, in <lambda>
IOError: [Errno 2] No such file or directory: 'A0003.dat'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/09 08:21:58 ERROR Executor: Exception in task 9.0 in stage 4.0 (TID 110)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/spark-current/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-current/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-6-58733c66cd22>", line 3, in <lambda>
IOError: [Errno 2] No such file or directory: 'A0002.dat'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/09 08:21:58 ERROR Executor: Exception in task 6.0 in stage 4.0 (TID 107)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/spark-current/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-current/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-6-58733c66cd22>", line 3, in <lambda>
IOError: [Errno 2] No such file or directory: 'A0001.dat'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/10/09 08:21:58 ERROR TaskSetManager: Task 12 in stage 4.0 failed 1 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-16f16ee2a9b8> in <module>()
----> 1 allRdd2.collect()

/usr/local/spark-current/python/pyspark/rdd.py in collect(self)
    711         """
    712         with SCCallSiteSync(self.context) as css:
--> 713             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    714         return list(_load_from_socket(port, self._jrdd_deserializer))
    715 

/usr/local/spark-current/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/spark-current/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 4.0 failed 1 times, most recent failure: Lost task 12.0 in stage 4.0 (TID 113, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/spark-current/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-current/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-6-58733c66cd22>", line 3, in <lambda>
IOError: [Errno 2] No such file or directory: 'A0003.dat'

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
0

1 Answer 1

0

The solution is quite simple: this method works when you provide the full path for your filenames.

fullfilenames = [fullpath + '/' + fname for fname in filenames]
filenameRdd = sc.parallelize(fullfilenames)
allRdd2 = filenameRdd.map(lambda x: np.fromfile(x, dtype="int16", count=-1, sep=''))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.