Cách đọc và ghi dữ liệu bảng trong PySpark

Cach Doc Va Ghi Du Lieu Bang Trong Pyspark



Xử lý dữ liệu trong PySpark nhanh hơn nếu dữ liệu được tải ở dạng bảng. Với điều này, sử dụng Biểu thức SQl, quá trình xử lý sẽ nhanh chóng. Vì vậy, chuyển đổi PySpark DataFrame/RDD thành một bảng trước khi gửi nó để xử lý là cách tiếp cận tốt hơn. Hôm nay, chúng ta sẽ xem cách đọc dữ liệu bảng vào Khung dữ liệu PySpark, ghi Khung dữ liệu PySpark vào bảng và chèn Khung dữ liệu mới vào bảng hiện có bằng các hàm tích hợp sẵn. Đi nào!

Pyspark.sql.DataFrameWriter.saveAsTable()

Đầu tiên, chúng ta sẽ xem cách ghi Khung dữ liệu PySpark hiện có vào bảng bằng cách sử dụng hàm write.saveAsTable(). Nó lấy tên bảng và các tham số tùy chọn khác như chế độ, partionBy, v.v. để ghi DataFrame vào bảng. Nó được lưu trữ dưới dạng tệp sàn gỗ.

Cú pháp:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Tên_bảng là tên của bảng được tạo từ dataframe_obj.
  2. Chúng ta có thể nối thêm/ghi đè lên dữ liệu của bảng bằng cách sử dụng tham số chế độ.
  3. partitionBy lấy một/nhiều cột để tạo phân vùng dựa trên các giá trị trong các cột được cung cấp này.

Ví dụ 1:

Tạo Khung dữ liệu PySpark với 5 hàng và 4 cột. Viết Dataframe này vào một bảng có tên “Agri_Table1”.



nhập pyspark

từ pyspark.sql nhập SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Gợi ý Linux' ).getOrCreate()

# dữ liệu canh tác với 5 hàng và 5 cột

nông nghiệp =[{ 'Loại đất' : 'Đen' , 'Irrigation_availability' : 'KHÔNG' , 'Đất cày cấy' : 2500 , 'Soil_status' : 'Khô' ,
'Quốc gia' : 'HOA KỲ' },

{ 'Loại đất' : 'Đen' , 'Irrigation_availability' : 'Đúng' , 'Đất cày cấy' : 3500 , 'Soil_status' : 'Ướt' ,
'Quốc gia' : 'Ấn Độ' },

{ 'Loại đất' : 'Màu đỏ' , 'Irrigation_availability' : 'Đúng' , 'Đất cày cấy' : 210 , 'Soil_status' : 'Khô' ,
'Quốc gia' : 'Anh' },

{ 'Loại đất' : 'Khác' , 'Irrigation_availability' : 'KHÔNG' , 'Đất cày cấy' : 1000 , 'Soil_status' : 'Ướt' ,
'Quốc gia' : 'HOA KỲ' },

{ 'Loại đất' : 'Cát' , 'Irrigation_availability' : 'KHÔNG' , 'Đất cày cấy' : 500 , 'Soil_status' : 'Khô' ,
'Quốc gia' : 'Ấn Độ' }]



# tạo khung dữ liệu từ dữ liệu trên

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Ghi DataFrame trên vào bảng.

agri_df.coalesce( 1 ).write.saveAsTable( 'Nông nghiệp_Bảng1' )

Đầu ra:







Chúng ta có thể thấy rằng một tệp sàn gỗ được tạo bằng Dữ liệu PySpark trước đó.



Ví dụ 2:

Xem xét Khung dữ liệu trước đó và ghi “Agri_Table2” vào bảng bằng cách phân vùng các bản ghi dựa trên các giá trị trong cột “Quốc gia”.

# Viết DataFrame ở trên vào bảng với tham số partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,phân vùngBy=[ 'Quốc gia' ])

Đầu ra:

Có ba giá trị duy nhất trong cột 'Quốc gia' - 'Ấn Độ', 'Vương quốc Anh' và 'Hoa Kỳ'. Vì vậy, ba phân vùng được tạo ra. Mỗi phân vùng chứa các tập tin sàn gỗ.

Pyspark.sql.DataFrameReader.table()

Hãy tải bảng vào Khung dữ liệu PySpark bằng hàm spark.read.table(). Nó chỉ nhận một tham số là tên đường dẫn/bảng. Nó trực tiếp tải bảng vào Khung dữ liệu PySpark và tất cả các hàm SQL được áp dụng cho Khung dữ liệu PySpark cũng có thể được áp dụng trên Khung dữ liệu đã tải này.

Cú pháp:

spark_app.read.table(path/’Table_name’)

Trong trường hợp này, chúng tôi sử dụng bảng trước đó được tạo từ Khung dữ liệu PySpark. Đảm bảo rằng bạn cần triển khai các đoạn mã kịch bản trước đó trong môi trường của mình.

Ví dụ:

Tải bảng “Agri_Table1” vào DataFrame có tên “loaded_data”.

đã tải_data = linuxhint_spark_app.read.table( 'Nông nghiệp_Bảng1' )

đã tải_data.show()

Đầu ra:

Chúng ta có thể thấy rằng bảng được tải vào Khung dữ liệu PySpark.

Thực thi truy vấn SQL

Bây giờ, chúng tôi thực hiện một số truy vấn SQL trên DataFrame đã tải bằng hàm spark.sql().

# Sử dụng lệnh SELECT để hiển thị tất cả các cột từ bảng trên.

linuxhint_spark_app.sql( 'CHỌN * từ Agri_Table1' ).trình diễn()

# mệnh đề WHERE

linuxhint_spark_app.sql( 'CHỌN * từ Agri_Table1 WHERE Soil_status='Khô' ' ).trình diễn()

linuxhint_spark_app.sql( 'CHỌN * từ Agri_Table1 WHERE Mẫu Anh > 2000' ).trình diễn()

Đầu ra:

  1. Truy vấn đầu tiên hiển thị tất cả các cột và bản ghi từ DataFrame.
  2. Truy vấn thứ hai hiển thị các bản ghi dựa trên cột “Soil_status”. Chỉ có ba bản ghi có phần tử “Khô”.
  3. Truy vấn cuối cùng trả về hai bản ghi có 'Mẫu đất' lớn hơn 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Sử dụng hàm insertInto(), chúng ta có thể nối thêm DataFrame vào bảng hiện có. Chúng ta có thể sử dụng chức năng này cùng với selectExpr() để xác định tên cột và sau đó chèn nó vào bảng. Hàm này cũng lấy tên bảng làm tham số.

Cú pháp:

DataFrame_obj.write.insertInto('Table_name')

Trong trường hợp này, chúng tôi sử dụng bảng trước đó được tạo từ Khung dữ liệu PySpark. Đảm bảo rằng bạn cần triển khai các đoạn mã kịch bản trước đó trong môi trường của mình.

Ví dụ:

Tạo một DataFrame mới với hai bản ghi và chèn chúng vào bảng “Agri_Table1”.

nhập pyspark

từ pyspark.sql nhập SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Gợi ý Linux' ).getOrCreate()

# dữ liệu canh tác với 2 hàng

nông nghiệp =[{ 'Loại đất' : 'Cát' , 'Irrigation_availability' : 'KHÔNG' , 'Đất cày cấy' : 2500 , 'Soil_status' : 'Khô' ,
'Quốc gia' : 'HOA KỲ' },

{ 'Loại đất' : 'Cát' , 'Irrigation_availability' : 'KHÔNG' , 'Đất cày cấy' : 1200 , 'Soil_status' : 'Ướt' ,
'Quốc gia' : 'Nhật Bản' }]

# tạo khung dữ liệu từ dữ liệu trên

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Đất cày cấy' , 'Quốc gia' , 'Irrigation_availability' , 'Loại đất' ,
'Đất_trạng' .write.insertInto( 'Nông nghiệp_Bảng1' )

# Hiển thị Agri_Table1 cuối cùng

linuxhint_spark_app.sql( 'CHỌN * từ Agri_Table1' ).trình diễn()

Đầu ra:

Bây giờ, tổng số hàng có trong DataFrame là 7.

Phần kết luận

Bây giờ bạn đã hiểu cách ghi Khung dữ liệu PySpark vào bảng bằng cách sử dụng hàm write.saveAsTable(). Nó lấy tên bảng và các tham số tùy chọn khác. Sau đó, chúng tôi đã tải bảng này vào Khung dữ liệu PySpark bằng hàm spark.read.table(). Nó chỉ nhận một tham số là tên đường dẫn/bảng. Nếu bạn muốn nối thêm DataFrame mới vào bảng hiện có, hãy sử dụng hàm insertInto().