หลังจากเป็นมือใหม่มาราวๆ 2 สัปดาห์ ก็พอจะมาบอกได้ว่ามันน่าจะเริ่มจากตรงไหนอะไรยังไงนะ เนื่องเพราะแฟนเรียน Big Data นั่นแหละ เราเลยต้องรู้ด้วย
เนื่องจากเคนใช้ iMac เพราะงั้นทุกโปรแกรมที่ใช้ก็จะอิงจาก Mac เป็นหลักนะ ใครใช้ Windows ก็อาจจะต้องไปลองหาๆ เพิ่มเติมเอานะ
จะพยายามพิมพ์เอาไว้เพื่อให้อ่านแล้วเริ่มต้นได้ง่ายที่สุดนะ และทุกอย่างที่อยู่ในบทความเป็นความเข้าใจจากที่เรียนรู้มา #รู้ไม่มากแต่อยากมาเล่า 555+ ซึ่งถ้าผิด ก็ไว้จะมาอัพเดทแก้นะ
ประเภทของข้อมูล Big Data
- ปริมาณ (Volume)
- ความเร็ว (Velocity)
- ความหลากหลาย (Variety)
- ความถูกต้อง (Veracity)
- คุณค่า (Value)
- ความแปรผันได้ (Variability)
สิ่งที่ต้องรู้จักเบื้องต้น
- โปรแกรม Virtual Box / Parallels สำหรับสร้างเครื่องเสมือนบนเครื่องของเราเพื่อติดตั้ง Ubuntu ใช้ตัวไหนก็ได้ แต่ตัว Parallels ที่เป็นตัวเสียเงิน ดีกว่ามาก
- OS Linux – Ubuntu เวอร์ชั่น LTS
- Hadoop
- Mysql / MongoDB
- Python
- Package ต่างๆ เช่น pandas, numpy, matplotlib, pyspark ฯลฯ
- Sqoop
- Hive
Hadoop
คือ โปรแกรมบริหารจัดการเครื่อง Server หลายๆ เครื่องให้มันทำงานร่วมกัน ทำให้เรามองเห็นเป็นเครื่องๆ เดียว ทำให้เราเก็บข้อมูลขนาดใหญ่ และกระจายไปเก็บในหลายๆ เครื่อง ทำให้เก็บข้อมูลได้ใหญ่ขึ้น และเข้าถึงได้เร็วขึ้น โดยตัวฮาดูปจะมีที่เก็บไฟล์ของตัวเองชื่อ HDFS (Hadoop Distributed File System)
เริ่มขั้นตอนติดตั้งซอฟท์แวร์
สำหรับขั้นตอนการติดตั้งเคนทำเป็นขั้นตอนไว้ใน Github เรียบร้อยแล้ว สามารถเข้าไปดูได้เลย https://github.com/kanexkane/big-data-101
PySpark กับ Pandas ต่างกันไหม
เท่าที่รู้ตอนนี้คือความสามารถเหมือนกันเลย ใช้อ่านและประมวลผลข้อมูลเหมือนกัน คำสั่งยังคล้ายๆ กันเลย อ่านๆ ไป ก็เจอแค่ว่า PySpark มันเร็วกว่า Pandas มาก แค่นั้นเอง
PySpark
โค้ดสำหรับเริ่มการใช้ pyspark ใน Google Colab
# ติดตั้ง pyspark ใน Google Colab !pip install pyspark # สร้าง Session ใส่ตัวแปร spark spark = SparkSession.builder.appName("APP_NAME_MASTER").getOrCreate() # แสดงผลออกมาดู spark
ใช้อะไรบ้างก็ import เข้าไป
คืออันนี้มันจะเกินๆ หน่อยนะ แต่ถ้าอันไหนใครไม่ใช้ก็ตัดออกละกัน อันนี้เป็นอันที่เราใช้ทั้งหมดที่เคยเขียนโค้ดมา
from pyspark.sql import SparkSession from pyspark.sql.types import DoubleType, IntegerType from pyspark.sql.functions import isnan, count, col, regexp_extract, when, lit from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StringIndexer, QuantileDiscretizer, OneHotEncoder from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, NaiveBayes, GBTClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator, ClusteringEvaluator from pyspark.ml.regression import GeneralizedLinearRegression
อ่านไฟล์ และทำความสะอาด
# อ่านไฟล์ที่ต้องการ df = spark.read.csv('E-Commerce Data.csv', inferSchema=True, header=True) # แสดงโครงสร้างของไฟล์ที่อ่านได้ออกมาดู df.printSchema() # เช็คว่ามีคอลัมภ์ไหนมีค่าว่างบ้าง df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() # เพิ่มคอลัมภ์ โดยใช้เงื่อนไขร่วมด้วย df = df.withColumn("Wanted", when( (df["Quantity"] > 50) , 1).otherwise(0)) # เติมค่าช่องว่าง 'Column': 'Value' df = df.na.fill({'Description': 'NO DESCRIPTION', 'CustomerID': 0 })
ทำคอลัมภ์ Features
# แปลงฟิลด์ที่ต้องการใช้เป็น Features จาก String เป็น Index โดยการวนลูป indexers = [ StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ['StockCode', 'Country'] ] # ทำ One Hot Encoder สำหรับ Feature ที่ StringIndex มีค่ามากกว่า 2 ตัว one_hot_encoders = OneHotEncoder( inputCols=[column+"_index" for column in ['StockCode', 'Country'] ], outputCols=[ column+"_prep" for column in ['StockCode', 'Country']] ) # ประกอบ Feature เป็นคอลัมภ์ จะได้เอาเข้า dataframe ได้ assembler = VectorAssembler( inputCols=['StockCode_prep', 'Country_prep', 'Quantity'], outputCol=feature_column ) # ต่อท่อ pipeline = Pipeline(stages=indexers + [one_hot_encoders, assembler]) # ทำงานตามท่อ และปรับรูปจาก DataFrame สร้างฟิลด์ features ขึ้นมา feature_df = pipeline.fit(df).transform(df)
กำหนดคอลัมภ์ feature, label เพื่อง่ายต่อการเขียน
# เลือกคอลัมภ์ที่จะเอามาเป็นผลลัพธ์ที่ต้องการ label_column = 'Wanted' feature_column = 'features' prediction_column = 'prediction'
แบ่งข้อมูลเพื่อฝึกและทดสอบ
# แยกข้อมูลเพื่อเทรน และเทส อัตราส่วน 7/3 (trainingData, testData) = feature_df.randomSplit([0.7, 0.3])
เทคนิค Logistic Regression
# เทคนิค Logistic regression lr = LogisticRegression(labelCol=label_column, featuresCol=feature_column) # เทรน lr_model = lr.fit(trainingData) # เทส prediction lr_prediction = lr_model.transform(testData)
# ประเมินผลโมเดล Logistic Regression ว่าตรงกี่เปอร์เซ็นต์ lr_evaluator = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol=prediction_column)
# ประเมินผลโมเดล Logistic Regression ว่าตรงกี่เปอร์เซ็นต์ lr_evaluator = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol=prediction_column) lr_evaluator_f1 = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "f1"}) lr_evaluator_accuracy = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "accuracy"}) lr_evaluator_precision = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "precisionByLabel"}) lr_evaluator_recall = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "recallByLabel"})
print("Accuracy of Logistic Regression is = %g" % (lr_evaluator_accuracy*100), '%') print("Precision of Logistic Regression = %g" % (lr_evaluator_precision*100), '%') print("F1 of Logistic Regression = %g" % (lr_evaluator_f1*100), '%') print("Recall of Logistic Regression = %g" % (lr_evaluator_recall*100), '%') print("Total rows of Logistic Regression prediction = %g" % lr_prediction.count())