Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: users shouldn't be able to specify a local path when writing output on a cluster #4479

Open
lionstory opened this issue May 22, 2022 · 2 comments · May be fixed by #4484
Open

BUG: users shouldn't be able to specify a local path when writing output on a cluster #4479

lionstory opened this issue May 22, 2022 · 2 comments · May be fixed by #4484
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P3 Very minor bugs, or features we can hopefully add some day.

Comments

@lionstory
Copy link

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): CentOS 7.6.1810
  • Modin version (modin.__version__): 0.12.1
  • Python version: 3.7.6
  • Ray version: 1.12.0 ( Ray cluster: 1 head node + 2 worker nodes)
  • Code we can use to reproduce:

import os
os.environ["MODIN_ENGINE"] = "ray"
import ray
ray.init(address="auto")
import modin.pandas as mpd
from collections import Counter
import pandas as pd
import time

@ray.remote
def modin_csv_parquet_perf(csv_file_prefix: str):
# modin read csv
start_time = time.time()
modin_df = mpd.read_csv(csv_file_prefix + '.csv')
print('===>Time cost(s) - modin read_csv: ' + str(time.time() - start_time))
print(f'Modin csv DF len = {len(modin_df)}')

# modin write parquet
start_time = time.time()
modin_df.to_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin to_parquet:  ' + str(time.time() - start_time))

# modin read parquet
start_time = time.time()
modin_df = mpd.read_parquet(csv_file_prefix + '.parquet', engine='fastparquet')
print('===>Time cost(s) - modin read_parquet:  ' + str(time.time() - start_time))
print(f'Modin parquet DF len = {len(modin_df)}')

if name == 'main':
print('''This cluster consists of
{} nodes in total
{} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))

CSV_FILE = '/home/term/wanlu/500w50f'

object_ids = [modin_csv_parquet_perf.remote(CSV_FILE) for _ in range(1)]
ip_addresses = ray.get(object_ids)
print('Tasks executed:')
for ip_address, num_tasks in Counter(ip_addresses).items():
    print('    {} tasks on {}'.format(num_tasks, ip_address))

Describe the problem

modin_df.to_parquet will write the dataframe(5 million records) to parquet files automatically partitioned on 3 ray nodes. But when mpd.read_parquet, I can only read the parquet records(2499994) of the head ray node that the scripts were run on. How can I get all the 5million records on 3 nodes? I don't see any document about these details. Many thanks.

Source code / logs

[term@dev-ctb-xs-196-65 wanlu]$ python3 testmodinpandas.py
This cluster consists of
3 nodes in total
24.0 CPU resources in total

(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_csv: 12.558054447174072
(modin_csv_parquet_perf pid=2961) Modin csv DF len = 5000000
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin to_parquet: 8.699442386627197
Tasks executed:
1 tasks on None
(modin_csv_parquet_perf pid=2961) ===>Time cost(s) - modin read_parquet: 6.788694381713867
(modin_csv_parquet_perf pid=2961) Modin parquet DF len = 2499994

@RehanSD
Copy link
Collaborator

RehanSD commented May 23, 2022

Hi Leon! Thank you so much for opening this issue! When working in a distributed setting, you want to use a distributed filesystem, such as AWS S3, or an NFS, rather than specifying a local file path, so that all of your files end up in the same place, and a distributed read will work. There's not really a good way to read a file split across multiple devices, and we'll be putting in a PR soon to warn users about using local file paths in a distributed setting! Hope that helps!

RehanSD added a commit to RehanSD/modin that referenced this issue May 23, 2022
…n performing a distributed write

Signed-off-by: Rehan Durrani <rehan@ponder.io>
@lionstory
Copy link
Author

Hi Leon! Thank you so much for opening this issue! When working in a distributed setting, you want to use a distributed filesystem, such as AWS S3, or an NFS, rather than specifying a local file path, so that all of your files end up in the same place, and a distributed read will work. There's not really a good way to read a file split across multiple devices, and we'll be putting in a PR soon to warn users about using local file paths in a distributed setting! Hope that helps!

RehhanSD, thanks a lot for the reply.

@vnlitvinov vnlitvinov added the P3 Very minor bugs, or features we can hopefully add some day. label Sep 6, 2022
@mvashishtha mvashishtha added documentation 📜 Updates and issues with the documentation bug 🦗 Something isn't working and removed documentation 📜 Updates and issues with the documentation labels Oct 12, 2022
@mvashishtha mvashishtha changed the title how to read all parquet partition files that wrote by modin df.to_parquet BUG: users shouldn't be able to specify a local path when writing output on a cluster Oct 12, 2022
@anmyachev anmyachev added the External Pull requests and issues from people who do not regularly contribute to modin label Apr 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P3 Very minor bugs, or features we can hopefully add some day.
Projects
None yet
5 participants