Contact Us
ขอขอบคุณที่ให้ความสนใจใน Cloud Ace
เราจะติดต่อท่านกลับไปโดยเร็วที่สุด
หากคุณเคยทำงานที่ต้องดึงข้อมูลจากฐานข้อมูล, แปลงข้อมูล และส่งต่อไปยังปลายทางต่างๆ คงรู้ดีว่า Workflow แบบนี้อาจยุ่งยากและซับซ้อน แต่ด้วย Cloud Composer การจัดการขั้นตอนเหล่านี้จะเป็นเรื่องที่สะดวกและราบรื่นมากขึ้น พร้อมทั้งช่วยลดความยุ่งยากในการประสานงานระหว่างระบบต่างๆ ที่เชื่อมต่อกันใน Google Cloud Platform แล้ว Cloud Composer คืออะไร?
Cloud Composer คือบริการที่ช่วยจัดการ Workflow แบบอัตโนมัติบน Google Cloud Platform ที่อิงจาก Apache Airflow ซึ่งเป็นเครื่องมือยอดนิยมในสายงาน Data Engineering และ DevOps ช่วยให้คุณสามารถออกแบบ, จัดการ และติดตาม Workflow ได้อย่างมีประสิทธิภาพ ทำให้การทำงานที่ต้องประสานข้อมูลหลายขั้นตอนกลายเป็นเรื่องง่ายยิ่งขึ้น
ด้วย Cloud Composer คุณสามารถ:
✅ สร้าง Workflow ที่ช่วยจัดการกระบวนการทำงานที่ซับซ้อนได้อย่างมีระเบียบ
✅ ตั้งเวลาทำงาน (Scheduling) ให้ทำงานอัตโนมัติตามช่วงเวลาที่ต้องการ
✅ ติดตามและปรับแก้ไข Workflow ได้อย่างง่ายดาย
พูดง่ายๆ ก็คือ Cloud Composer ช่วยให้การจัดการ Pipeline และ Workflow ต่างๆ เป็นไปอย่างมีประสิทธิภาพ นอกจากนี้ยังลดความยุ่งยากในกระบวนการทำงานที่ต้องประสานหลายระบบอีกด้วย
เราได้เกริ่นถึง Cloud Composer ไปแล้วว่าเป็นเครื่องมือที่ช่วยจัดการ Workflow แต่เพื่อให้ทุกคนเข้าใจลึกซึ้งมากขึ้น คลาวด์ เอซจะพาทุกคนมาเจาะลึกเพิ่มเติม เพื่อให้เห็นภาพชัดเจนยิ่งขึ้นว่ามันทำงานยังไงและเราสามารถเริ่มต้นการใช้งานได้อย่างไร?
ฟีเจอร์เด่นๆ ที่คุณควรรู้:
1. Integration กับบริการของ GCP
Cloud Composer ถูกออกแบบมาเพื่อทำงานร่วมกับบริการต่างๆ ของ GCP ได้อย่างราบรื่น เช่น:
✅ BigQuery: ส่งข้อมูลเพื่อการวิเคราะห์
✅ Cloud Storage: เก็บและดึงข้อมูล
✅ Pub/Sub: ประสานงานระบบด้วยการส่งข้อความ
✅ Dataflow: ประมวลผลข้อมูลขนาดใหญ่
การเชื่อมต่อกับบริการเหล่านี้ช่วยให้การตั้งค่า Pipeline ที่ต้องใช้หลายๆ บริการร่วมกันเป็นเรื่องง่ายและลดความซับซ้อนในการทำงาน
2. รองรับ Workflow ที่ซับซ้อน
ด้วย Apache Airflow เราสามารถ:
✅ สร้าง DAG (Directed Acyclic Graph): แผนผังที่ช่วยแสดงลำดับขั้นตอนการทำงานใน Workflow อย่างชัดเจน
✅ ตั้งเงื่อนไขการทำงาน: กำหนดให้บาง Task ต้องเสร็จก่อนถึงจะเริ่ม Task ถัดไปได้ เพื่อให้ทุกขั้นตอนเป็นไปตามลำดับที่ต้องการ
✅ ควบคุม Task Dependency: ทำให้ Workflow มีระเบียบและลดข้อผิดพลาดที่อาจเกิดขึ้นจากการทำงานไม่ตรงตามลำดับ
3. รองรับการเขียนโค้ดด้วย Python
Cloud Composer ใช้ Apache Airflow ซึ่งรองรับการเขียน Workflow ด้วย Python ได้โดยตรง
✅ สร้าง Task ต่างๆ ได้ง่าย เช่น การดึงข้อมูล, แปลงข้อมูล, หรือการส่งอีเมล
✅ เพิ่มฟังก์ชันเฉพาะตัว ผ่านการเขียนโค้ดที่สามารถปรับแต่งได้ตามต้องการ
4. การตั้งเวลาและจัดการ Workflow อัตโนมัติ
✅ กำหนดเวลาให้ Workflow ทำงานได้อัตโนมัติ เช่น ทุกชั่วโมง, ทุกวัน, หรือทุกเดือน
✅ รองรับ Cron jobs และ Time-based scheduling
✅ เหมาะสำหรับการตั้งเวลา ETL (Extract, Transform, Load) หรือการส่งรายงานอัตโนมัติ
5. การติดตามและแจ้งเตือน (Monitoring and Alerting)
Cloud Composer มาพร้อมกับ Dashboard ที่ช่วยให้คุณสามารถติดตามสถานะของ Workflow ได้
✅ ดูสถานะของ Task: เช่น Success, Failed, Pending
✅ ตั้งค่าแจ้งเตือน ผ่านอีเมลหรือบริการอื่นๆ เมื่อ Workflow ล้มเหลว
6. รองรับ Multi-cloud และ On-premises
Cloud Composer สามารถเชื่อมต่อ workflow กับ บริการนอก GCP หรือ On-premises ได้ง่าย เช่น:
✅ AWS S3
✅ MySQL
✅ PostgreSQL
7. Auto-scaling และ Cost-efficiency
✅ Cloud Composer รองรับ Auto-scaling ที่ช่วยปรับทรัพยากรอัตโนมัติตามปริมาณงาน ซึ่งไม่เพียงแต่ช่วยเพิ่มประสิทธิภาพ แต่ยังช่วย ประหยัดค่าใช้จ่าย
✅ ระบบจะปรับทรัพยากรให้เหมาะสม โดยไม่ต้องกังวลเรื่องการใช้ทรัพยากรเกินความจำเป็น ช่วยให้คุณจ่ายเฉพาะสิ่งที่ใช้งานจริง
8. การจัดการ Workflow ด้วย UI และ CLI
จัดการ workflow ได้ง่ายผ่าน:
✅ UI ที่ใช้งานง่าย: ช่วยให้คุณจัดการ DAG และ Task ได้ในไม่กี่คลิก 🖱️
✅ Command Line Interface (CLI): เหมาะสำหรับ Developer ที่ชอบควบคุมงานด้วยคำสั่ง
และนี่คือ Key Features หลักๆ ของ Cloud Composer ที่ช่วยให้การจัดการ Workflow เป็นเรื่องง่ายและมีประสิทธิภาพ 🙌
โครงสร้างการทำงานของ Cloud Composer ☁️ ประกอบด้วยส่วนสำคัญ ดังต่อไปนี้:
✅ Cloud Composer ใช้ GKE ในการรัน Apache Airflow และจัดการ Workflow
✅ แต่ละ DAG จะถูกรันเป็น Task ภายใน Cluster ของ GKE
✅ GKE ทำหน้าที่ สร้าง, จัดการ และลบคลัสเตอร์ที่ใช้รันส่วนประกอบต่างๆ ของ Airflow
เปรียบ GKE เหมือน "บ้าน" ของ Airflow ที่มีทรัพยากรสำคัญ เช่น CPU, หน่วยความจำ และพื้นที่เก็บข้อมูล ซึ่ง Cloud Composer จัดการให้ทั้งหมด
✅ ใช้สำหรับเก็บไฟล์ DAG (Workflow Definitions), Logs และข้อมูลอื่นๆ ที่เกี่ยวข้อง
✅ ไฟล์ DAG จะถูกอัปโหลดไว้ใน Cloud Storage Bucket
✅ ใช้เก็บ DAGs, Plugins และ Logs
การเก็บไฟล์ใน Cloud Storage ทำให้คุณสามารถ จัดการเวอร์ชัน และ แชร์ DAGs ได้อย่างง่ายมากขึ้น
✅ Scheduler ทำหน้าที่กำหนดเวลาและเรียกใช้งาน DAGs (Directed Acyclic Graphs) เปรียบเหมือน "หัวหน้า" ที่คอยสั่งงาน
✅ Workers ประมวลผล Task ต่างๆ ที่ถูกกำหนดใน DAGs เปรียบเหมือน "พนักงาน" ที่ทำงานตามคำสั่งของ Scheduler
✅ Web Server เป็นอินเทอร์เฟซที่ใช้สำหรับจัดการและตรวจสอบ DAGs ผ่านเว็บเบราว์เซอร์ เปรียบเหมือน "ห้องควบคุม" ที่ให้คุณสามารถดูสถานะต่างๆ
✅ ใช้เก็บข้อมูลเกี่ยวกับ DAGs, Task และสถานะการทำงานต่างๆ
✅ เปรียบเหมือน "สมุดบันทึก" ที่เก็บข้อมูลทุกอย่างเกี่ยวกับ Airflow
✅ ฐานข้อมูลนี้ใช้จัดเก็บ Metadata เช่น สถานะของ Workflow, Task และการตั้งเวล
IAM (Identity and Access Management)
✅ ใช้จัดการสิทธิ์การเข้าถึงทรัพยากรต่างๆ เช่น
การจัดการ Workflow
การเข้าถึง Cloud Storage
✅ การใช้ IAM ทำให้การรักษาความปลอดภัยของ Workflow เป็นไปอย่างมีประสิทธิภาพ และสามารถควบคุมการเข้าถึงได้อย่างละเอียดเพื่อป้องกันการเข้าถึงที่ไม่เหมาะสม
✅ ติดตามสถานะ Workflow ผ่าน Dashboard ของ Airflow
✅ Logs จะถูกส่งไปยัง Cloud Logging เพื่อช่วยในการวิเคราะห์และตรวจสอบปัญหาอย่างรวดเร็ว
การทำงานร่วมกันของส่วนต่างๆ เหล่านี้ทำให้ Cloud Composer สามารถจัดการ Workflow อย่างมีประสิทธิภาพ โดยสามารถติดตามและควบคุมทุกขั้นตอนของกระบวนการทำงานได้อย่างใกล้ชิดได้ทุกขั้นตอน!
โอเคค่ะทุกคน ทีนี้เราจะมาลุยสร้าง Cloud Composer กัน! ลองนึกภาพว่าเรามีชุดข้อมูลที่กระจายอยู่ทั่วโลก และข้อมูลเหล่านี้อยู่ใน Cloud Storage Buckets หรือ BigQuery Tables แล้วคำถามคือ... เราจะจัดการข้อมูลเหล่านี้ยังไงเพื่อให้สามารถรวบรวมมาวิเคราะห์และสร้างประโยชน์ให้กับธุรกิจของเรา?
วันนี้คลาวด์ เอซ จะพาทุกท่านมาหาคำตอบว่า Cloud Composer จะเข้ามาช่วยในการทำ Data Orchestration หรือการสร้าง Workflow ในการทำงานได้อย่างไร!
สิ่งที่เราจะได้เรียนรู้ในวันนี้:
1. สร้าง Cloud Composer environment
2. สร้าง Cloud Storage buckets
3. สร้าง BigQuery datasets
4. สร้างและรัน Apache Airflow workflows เพื่อย้ายข้อมูลระหว่าง Cloud Storage และ BigQuery
พร้อมลุยแล้วใช่มั้ย? มาทำข้อแรกกันเลย! 🔥
1. เข้าในไปที่ Google Cloud Console เสิร์ช Cloud Composer
2. เมื่อเห็นหน้าจอแบบนี้ ให้คลิกที่ CREATE ENVIRONMENT แล้วเลือก Composer 3
3. ทีนี้เราจะมาใส่รายละเอียดของ Composer ตัวนี้ที่เราจะสร้างขึ้นมา
4. จากนั้นเลื่อนลงมา แล้วเลือก Environment resources กด SMALL
5. หลังจากนั้นเลื่อนลงมาจะเจอ Advanced Configuration ให้เลือกตรง Airflow database zone เป็น us-central1-a.
6. สุดท้ายกดที่ CREATE ได้เลย ส่วนนี้จะใช้เวลาค่อนข้างนานในการสร้าง Composer
ทีนี้เราจะมาสร้าง Buckets กันก่อนใน Cloud Storage
1. เปิดไปที่ Cloud Storage เลือก Buckets แล้วกด CREATE
2. หลังจากนั้นตั้งชื่อและเลือก region ตามรูปด้านล่างนี้ 👇 แล้วกด Create ได้เลย
3. หลังจากที่เรามี Bucket ที่เป็น region us แล้ว ต่อไปเราจะไปสร้าง region eu กันต่อ เราสามารถทำตาม step เดิมได้เลย แค่เปลี่ยน Region ตามรูปด้านล่างนี้
ทีนี้เราจะมาสร้าง Dataset ที่จะเป็น Dataset ปลายทางของเราใน region eu บน Bigquery
1. ไปที่ BigQuery กด: เลือก Create dataset
2. จากนั้นใส่รายละเอียดตามนี้ได้เลย
3. หลังจากนั้นกด CREATE DATASET ได้เลย
ก่อนที่เราจะไปกันต่อ... มาทำความเข้าใจ Airflow กันก่อนดีกว่า!
Airflow คือเครื่องมือที่ช่วย จัดการงานหรือกระบวนการทำงานต่างๆ ให้เป็นระบบอัตโนมัติ เช่น การตั้งเวลาและติดตามการทำงานของ Task หลายๆ งาน
แล้ว Airflow ทำงานยังไง?
เราสร้าง DAG (Directed Acyclic Graph) ซึ่งเป็นแผนผังที่กำหนดลำดับการทำงานของแต่ละ Task ว่าต้องทำอะไรก่อน-หลัง
Airflow จะทำการรันงานตามลำดับที่เรากำหนดไว้โดยอัตโนมัติ ทำให้ทุกขั้นตอนของกระบวนการทำงานเป็นไปอย่างราบรื่น
แนวคิดหลักของ Airflow
✅ DAG (Directed Acyclic Graph)
คือกราฟของงานที่มีลำดับชัดเจนและไม่มี Loop ซึ่งบอกลำดับการทำงานของ Task ต่างๆ
✅ Operator
คือประเภทของ Task เช่น BashOperator ที่ใช้รันคำสั่ง Bash Operator ที่ใช้รันคำสั่ง Bash
✅ Task
เป็น Instance ของ Operator ที่เรากำหนดค่าต่างๆ สำหรับการทำงานแต่ละขั้นตอน โดยแต่ละ Task คือ Node ใน DAG
✅ Task Instance
คือการรัน Task จริง ๆ ในเวลาที่กำหนด ซึ่งแต่ละ Task Instance จะมีสถานะ เช่น ✅ success, ❌ failed, ⏳ running, ⏭️ skipped
โอเคตอนนี้เราก็น่าจะเข้าใจ Airflow พื้นฐานกันแล้ว พร้อมลุยในขั้นต่อไปกันเลย!
Cloud Composer ใช้ DAGs (Directed Acyclic Graphs) ในการสร้างและจัดการ Workflow
ในไฟล์ bq_copy_across_locations.py
ไฟล์นี้คือ DAG หรือ Workflow ที่กำหนดขั้นตอนการทำงานทั้งหมด
ใน DAG นี้จะมีการใช้ Operator หลักๆ ดังนี้:
DummyOperator : ใช้สร้างงานจำลอง (Dummy Task) ที่เป็นจุดเริ่มต้น (Start) และจุดสิ้นสุด (End) เพื่อให้ DAG ดูเป็นระเบียบและเข้าใจง่ายขึ้น
BigQueryToCloudStorageOperator : ใช้สำหรับส่งออกข้อมูลจาก BigQuery ไปยัง Cloud Storage ในรูปแบบไฟล์ Avro
GoogleCloudStorageToGoogleCloudStorageOperator : ใช้สำหรับคัดลอกไฟล์จาก Cloud Storage หนึ่งไปยังอีก Cloud Storage หนึ่ง
GoogleCloudStorageToBigQueryOperator : ใช้สำหรับนำเข้าข้อมูลจากไฟล์ Avro ใน Cloud Storage ไปยัง BigQuery
พอจะเข้าใจคอนเซ็ปต์แล้วใช่ไหม? มาลุยกันต่อเลย!
✅ ตัวอย่างนี้ เป็นการอธิบายฟังก์ชัน read_table_list() ซึ่งถูกออกแบบมาเพื่ออ่านไฟล์ config (ไฟล์ CSV) และสร้างรายการของตาราง (tables) ที่จะใช้ในการคัดลอกข้อมูลใน Airflow DAG
# --------------------------------------------------------------------------------
# Functions
# --------------------------------------------------------------------------------
def read_table_list(table_list_file):
"""
Reads the master CSV file that will help in creating Airflow tasks in
the DAG dynamically.
:param table_list_file: (String) The file location of the master file,
e.g. '/home/airflow/framework/master.csv'
:return master_record_all: (List) List of Python dictionaries containing
the information for a single row in master CSV file.
"""
master_record_all = []
logger.info('Reading table_list_file from : %s' % str(table_list_file))
try:
with open(table_list_file, 'rb') as csv_file:
csv_reader = csv.reader(csv_file)
next(csv_reader) # skip the headers
for row in csv_reader:
logger.info(row)
master_record = {
'table_source': row[0],
'table_dest': row[1]
}
master_record_all.append(master_record)
return master_record_all
except IOError as e:
logger.error('Error opening table_list_file %s: ' % str(
table_list_file), e)
✅ ตัวอย่างนี้ เป็นการกำหนด DAG ชื่อ bq_copy_us_to_eu_01 สำหรับการจัดการ Workflow ใน Airflow เรามาดูรายละเอียดกันดีกว่า
Bq_copy_us_to_eu_01 เป็นชื่อที่ตั้งให้กับ DAG นี้ ซึ่งมันบอกถึงการคัดลอกข้อมูลจากโซน US ไปยัง EU
default_args คือค่าพื้นฐานที่ใช้กำหนดพฤติกรรมของ DAG และ Tasks ใน DAG
default_args = {
'owner': 'airflow', # เจ้าของ DAG (กำหนดเป็น 'airflow' โดยทั่วไป)
'start_date': datetime.today(), # วันที่เริ่มต้น (ใช้วันปัจจุบัน)
'depends_on_past': False, # ไม่ต้องรอให้ Task ก่อนหน้าเสร็จสมบูรณ์
'email': [''], # อีเมลสำหรับแจ้งเตือน (ยังไม่ได้ตั้งค่า)
'email_on_failure': False, # ไม่ส่งอีเมลเมื่อเกิดความล้มเหลว
'email_on_retry': False, # ไม่ส่งอีเมลเมื่อมีการลองใหม่
'retries': 1, # ลองใหม่ได้ 1 ครั้งถ้าล้มเหลว
'retry_delay': timedelta(minutes=5), # รอ 5 นาทีก่อนลองใหม่}
รายละเอียดเพิ่มเติม
✅ owner: ใช้ระบุว่าใครเป็นผู้ดูแล DAG นี้
✅ start_date: วันที่ที่ DAG เริ่มต้นทำงาน (ใช้ datetime.today() ซึ่งหมายถึงวันปัจจุบัน)
✅ depends_on_past: ถ้าเป็น True จะต้องรอ Task ก่อนหน้าใน DAG ทำงานเสร็จในรอบก่อน ๆ แต่ในที่นี้เป็น False
✅ email: อีเมลที่ใช้รับการแจ้งเตือน (ยังไม่ได้ตั้งค่า)
✅ email_on_failure / email_on_retry: ปิดการแจ้งเตือนผ่านอีเมลในกรณีเกิดปัญหาหรือ Task ล้มเหลว
✅ retries: จำนวนครั้งที่ Task สามารถลองทำใหม่ได้ในกรณีล้มเหลว
✅ retry_delay: ระยะเวลาที่จะรอก่อนลองใหม่
# DAG object.
with models.DAG('bq_copy_us_to_eu_01',
default_args=default_args,
schedule_interval=None) as dag:
✅ models.DAG: เป็นคำสั่งสำหรับสร้าง DAG
✅ schedule_interval=None: หมายความว่า DAG นี้ไม่มีการตั้งเวลาอัตโนมัติ (ต้องรันแบบ Manual หรือกดรันเอง)
✅ default_args=default_args: ใช้ค่าพื้นฐานจากตัวแปร default_args
สรุป DAG ชื่อ bq_copy_us_to_eu_01 ถูกกำหนดให้ไม่ทำงานโดยอัตโนมัติและต้องรันเอง (Manual Trigger) โดยมีการตั้งค่าเริ่มต้นต่างๆ เช่น จำนวนครั้งที่ Task สามารถลองใหม่ได้ และระยะเวลารอก่อนลองใหม่
โอเคจากที่เราสร้าง bucket ใน cloud storage สร้าง dataset ใน bigquery และอธิบายเรื่อง composer ไปแล้ว คิดว่า composer ของเราน่าจะสร้างเสร็จแล้ว เราไปดูกัน!
โอเค composer ของเราสร้างเสร็จแล้ว เราลองเข้าไปดู detail กัน
นี่คือหน้า Environment details page เป็นหน้าที่ให้ข้อมูลเกี่ยวกับ Environment ของ Cloud Composer ในหน้านี้จะแสดงข้อมูลสำคัญ เช่น
✅ Airflow web UI URL: ลิงก์สำหรับเข้าถึง Airflow Web UI ซึ่งเป็นหน้าสำหรับจัดการและติดตาม DAGs (Workflows)ใช้เพื่อตรวจสอบสถานะของ DAGs, ดูรายละเอียดของ Task ต่าง ๆ และเรียกใช้งาน DAG แบบ Manual
✅ ชื่อของ Cloud Storage Bucket ที่เชื่อมกับโฟลเดอร์ DAGs: Cloud Composer จะใช้ Cloud Storage เป็นที่เก็บ DAG files ไฟล์ Python DAGs จะถูกเก็บไว้ใน Cloud Storage Bucket ที่สร้างขึ้นสำหรับ Environment นั้น
ต่อไปเราต้องสร้าง Virtual Environment เปิด Cloud Shell ก่อน แล้ว copy รันตาม step นี้ได้เลย
1. Install the virtualenv environment
2. Build the virtual environment
3. Activate the virtual environment
ใน Cloud Composer ไฟล์ DAGs จะถูกเก็บไว้ใน Cloud Storage Bucket ที่สร้างขึ้นโดยอัตโนมัติเมื่อเราสร้าง Environment ใหม่ ก่อนอื่นเลยไปที่ Cloud Storage หาชื่อของ DAGs Bucket ซึ่งจะมีรูปแบบประมาณนี้
REGION-composer-advanced-YOURDAGSBUCKET-bucket
ซึ่งเราได้มาเป็น us-central1-composer-advanc-03aeaceb-bucket
ทีนี้เราก็นำชื่อไปแทนที่ในคำสั่งนี้ แล้วกด enter ใน Cloud Shell ได้เลย
DAGS_BUCKET=<REGION-composer-advanced-YOURDAGSBUCKET-bucket>
หรือเป็นรูปแบบนี้
DAGS_BUCKET=<us-central1-composer-advanc-03aeaceb-bucket>
*ซึ่งเราจะใช้ตัวแปรนี้ในตัวอย่างการเริ่มใช้ Cloud Composer เก็บตัวแปรนี้ไว้ก่อนเราจะไปใช้ใน step ต่อๆ ไป
ต่อมาเราจะต้อง set variable ให้ airflow ซึ่งมันก็คือค่าตัวแปรที่ใช้ใน Apache Airflow เพื่อเก็บข้อมูลที่สามารถนำไปใช้ซ้ำใน DAGs
ในขั้นตอนนี้ จะต้องตั้งค่าตัวแปร 3 ตัว เพื่อใช้ใน DAG ที่จะ deploy
สรุปแบบเข้าใจง่าย
☑️ DAG จะใช้ไฟล์ CSV (table_list_file_path) เป็นตัวกำหนดว่าต้องคัดลอกตารางอะไรบ้าง
☑️ ข้อมูลจาก BigQuery (US) จะถูก export ไปที่ gcs_source_bucket
☑️ จากนั้น ข้อมูลจะถูกคัดลอกไปยัง gcs_dest_bucket
☑️ สุดท้าย DAG จะ import ข้อมูลจาก gcs_dest_bucket ไปยัง BigQuery (EU)
พูดง่ายๆ ก็คือ Airflow DAG นี้ช่วยย้ายข้อมูลจาก BigQuery US → Cloud Storage → BigQuery EU นั้นก็คือจุดประสงค์ของเราที่ทำมาตั้งแต่เริ่มต้นนั้นเอง
✅ ต่อไปเราจะใช้คำสั่ง gcloud composer environments run เพื่อตั้งค่าตัวแปร Airflow Variables ที่ DAG ใช้ทำงาน โดยเราจะใช้ Airflow CLI sub-command: variables เพื่อกำหนดค่าให้กับตัวแปร
✅ คำสั่ง gcloud composer environments run จะมีรูปแบบ ดังนี้
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION variables -- \
set KEY VALUE
🔹 คำสั่งที่ต้องใช้ใน Cloud Shell
เราต้องรันคำสั่ง 3 ครั้ง เพื่อตั้งค่าตัวแปรทั้ง 3 ตัว ดังนี้:
1️⃣ ตั้งค่า table_list_file_path (ไฟล์ CSV ที่ใช้กำหนดตารางที่ต้องคัดลอก)
gcloud composer environments run composer-advanced-lab \
--location "REGION" variables -- \
set table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv
2️⃣ ตั้งค่า gcs_source_bucket (Bucket ที่ใช้เก็บข้อมูลจาก BigQuery US)
gcloud composer environments run composer-advanced-lab \
--location "us-central1" variables -- \
set gcs_source_bucket {UNIQUE ID}-us
3️⃣ ตั้งค่า gcs_dest_bucket (Bucket ที่ใช้เก็บข้อมูลไปยัง BigQuery EU)
gcloud composer environments run composer-advanced-lab \
--location "REGION" variables -- \
set gcs_dest_bucket {UNIQUE_ID}-eu
อย่าลืมเปลี่ยนค่า "REGION = us-central1" เป็นโซนที่ใช้ และ {UNIQUE ID} = qwiklabs-gcp-04-9582c46403b3 เป็น ID โปรเจกต์ของเรานะ
ตัวอย่างใน Cloud Shell หลังจากเรารัน 3 คำสั่งนี้แล้วจะแสดงดังภาพตัวอย่างนี้
สรุปแบบเข้าใจง่าย
1️⃣ ใช้ gcloud composer ตั้งค่า Airflow Variables 3 ตัว (table_list_file_path, gcs_source_bucket, gcs_dest_bucket)
2️⃣ ใช้คำสั่ง get เพื่อตรวจสอบค่าที่ตั้งไปแล้ว
3️⃣ ถ้ามี Error (NoneType object is not callable) ไม่ต้องตกใจ เพราะค่ายังตั้งได้ตามปกติ
พูดง่ายๆ คือ เราใช้คำสั่งนี้เพื่อให้ DAG ของ Airflow รู้ว่า ไฟล์ตารางอยู่ที่ไหน และต้องเก็บข้อมูลที่ไหนก่อนจะ Import เข้า BigQuery!
การอัปโหลด DAG และ Dependencies ไปยัง Cloud Storage
ในขั้นตอนนี้ เราจะอัปโหลดไฟล์ที่จำเป็นสำหรับ Airflow DAG ไปยัง Cloud Storage ที่เชื่อมต่อกับ Cloud Composer เพื่อให้ DAG สามารถทำงานได้ตามที่เราต้องการ
✅ 1. คัดลอกตัวอย่างไฟล์จาก Google Cloud ไปยัง Cloud Shell
ใช้คำสั่งนี้เพื่อคัดลอก ตัวอย่างโค้ด Python ที่ Google Cloud เตรียมไว้ ลงใน Cloud Shell
cd ~
gcloud storage cp -r gs://spls/gsp283/python-docs-samples.
สรุปแบบเข้าใจง่าย:
คำสั่ง gcloud storage cp -r ใช้คัดลอกไฟล์จาก Cloud Storage (Google ที่เตรียมไว้ให้) ลงในเครื่อง Cloud Shell
ไฟล์ที่คัดลอกมาจะอยู่ในโฟลเดอร์ python-docs-samples ใน Cloud Shell
✅ 2. อัปโหลด Plugins ของ Third-party ไปยัง Cloud Storage ของ Composer
บาง DAG ใช้ Hook และ Operator จาก Third-party (ปลั๊กอินเสริม) ซึ่งต้องอัปโหลดไปยัง plugins folder ของ Cloud Composer
gcloud storage cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
สรุปแบบเข้าใจง่าย:
คำสั่งนี้จะคัดลอกปลั๊กอินไปยัง Cloud Storage Bucket ของ Cloud Composer
${DAGS_BUCKET} คือ ชื่อ Bucket ที่ Composer ใช้เก็บ DAGs
ปลั๊กอินเหล่านี้ช่วยให้ Airflow เชื่อมต่อกับระบบอื่นๆ ได้
✅ 3. อัปโหลด DAG และ Config File ไปยัง Cloud Storage ของ Composer
ตอนนี้เราจะอัปโหลด DAG (Workflow) และไฟล์ CSV ที่ใช้กำหนดข้อมูลที่ต้องคัดลอก ไปยัง Cloud Storage Bucket ของ Composer
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://us-central1-composer-advanc-03aeaceb-bucket/dags
สรุปแบบเข้าใจง่าย:
ไฟล์ bq_copy_across_locations.py เป็นโค้ด DAG ที่ใช้ควบคุม Workflow
ไฟล์ bq_copy_eu_to_us_sample.csv เป็นไฟล์ที่กำหนดตาราง BigQuery ที่ต้องคัดลอกจาก EU ไป US
DAG จะถูกเก็บไว้ใน โฟลเดอร์ dags ของ Cloud Composer
หลังจากอัปโหลดเสร็จ Cloud Composer จะลงทะเบียน DAG ให้โดยอัตโนมัติภายใน 3-5 นาที
เราสามารถเข้าไปที่ Airflow Web UI เพื่อตรวจสอบสถานะ DAG นี้ จะยังไม่ถูกตั้งเวลาให้รันอัตโนมัติ ตามค่าที่เราตั้งไว้ (schedule_interval=None)
✅ สรุปในขั้นตอนนี้แบบเข้าใจง่าย!!!
ดาวน์โหลดโค้ดตัวอย่างของ Google Cloud ไปยัง Cloud Shell
อัปโหลด Plugins ของ Third-party ไปที่ Cloud Storage ของ Composer
อัปโหลด DAG และไฟล์ CSV ที่ใช้ใน Workflow ไปที่ Cloud Storage
รอ 3-5 นาที เพื่อให้ DAG ปรากฏใน Airflow UI
ตรวจสอบใน Airflow UI ว่า DAG ถูกโหลดแล้ว และยังไม่ได้ตั้งเวลาให้รันอัตโนมัติ
สั้นๆ คือ เรากำลังอัปโหลดโค้ด DAG + ไฟล์ที่ต้องใช้ไปไว้ที่ Cloud Storage ของ Composer เพื่อให้ DAG ทำงานได้!
🔸 วิธีเข้าถึง Airflow Web Interface และเรียกใช้งาน DAG
Airflow Web Interface เป็นเหมือนหน้าจอที่ช่วยให้เราควบคุมและตรวจสอบ DAG ใน Cloud Composer ได้ง่ายขึ้น เช่น ดูค่าตัวแปร, เรียกใช้ DAG, และติดตามสถานะของ Task
1. เข้าใช้งาน Airflow Web Interface
2. ไปที่หน้า Cloud Composer Environments ไปที่เมนู Composer > Environments
3. หลังจากนั้น กดตรง Airflow ตามรูปด้านล่างนี้ 👇🏻
แล้วเราเปิดมาก็จะเจอหน้าตาแบบนี้ นี่คือหน้าตาของ Airflow Interface นั้นเอง
ทีนี้หลังจากที่เราเปิดมาที่หน้า Airflow UI แล้ว ในขั้นตอนนี้เราจะมาดู ค่าตัวแปรที่เราตั้งค่าไปก่อนหน้านี้ (เช่น gcs_source_bucket และ gcs_dest_bucket) ที่ถูกบันทึกไว้ใน Environment แล้ว และสามารถตรวจสอบได้จาก Airflow UI
กดไปที่ Admin > Variables
หน้าตาจะเป็นแบบนี้ 👇
4. เรียกใช้ DAG ด้วยตัวเอง (Trigger DAG manually)
ปกติ DAG จะรันตามกำหนดเวลา แต่เราสามารถสั่งให้มันทำงานได้ทันทีด้วยตัวเราเองได้ เดี๋ยวเรามาลองทำกัน!
🔸 เรามาสรุปในส่วนนี้กันดีกว่าว่าเราทำอะไรกันไปบ้าง
1️⃣ เข้า Airflow Web UI → ไปที่ Cloud Composer > Environments > คลิกลิงก์ Airflow
2️⃣ ดูค่าตัวแปร (Variables) → ไปที่ Admin > Variables
3️⃣ รัน DAG ด้วยตัวเอง (Trigger DAG manually) → ไปที่ DAGs > กดปุ่ม ▶️ > กด "Trigger DAG"
ตรวจสอบสถานะ DAG และ Task ใน Airflow UI
ก็คือเรากำลังเข้าดู Airflow UI, ตรวจสอบค่าตัวแปร และสั่งให้ DAG ทำงานทันทีนั้นเอง
🔸 วิธีดูสถานะ DAG และรันใหม่ใน Airflow UI
เมื่อเรา อัปโหลดไฟล์ DAG ไปยัง Cloud Storage ในโฟลเดอร์ dags/ ของ Cloud Composer ระบบจะ ตรวจสอบ (Parse) ไฟล์ และถ้าไม่มีข้อผิดพลาด
✅ DAG จะปรากฏในรายการ DAGs และพร้อมทำงานทันที (ถ้าตั้งค่าให้ทำงานอัตโนมัติ)
1. เช็คสถานะ DAG ใน Airflow UI
ไปที่แท็บ "DAGs" แล้วหา DAG ที่เราอัปโหลด
ถ้ารันสำเร็จ = สถานะ "Runs" จะเป็นสีเขียว ✅
ถ้าล้มเหลว = สถานะ "Runs" จะเป็นสีแดง ❌
2. ดูโครงสร้าง DAG และสถานะของ Task
🟢 ขอบสีเขียว = กำลังรันอยู่
🔴 ขอบสีแดง = รันล้มเหลว
⚪ ขอบสีเทา = ยังไม่ได้รัน
3. รัน DAG ใหม่จาก Graph View
ถ้า DAG มีปัญหาหรือต้องการรันใหม่
✅ วิธีล้างค่าและรันใหม่ ในหน้า Graph View
🔸 สรุปแบบเข้าใจง่าย
1. อัปโหลด DAG ไปที่ Cloud Storage → Composer จะโหลดเข้า Airflow ให้อัตโนมัติ
2. ดูสถานะ DAG ใน Airflow UI (สีเขียว ✅ = ทำงาน, สีแดง ❌ = ล้มเหลว)
3. กดเข้า "Graph View" เพื่อดูโครงสร้างของ DAG
4. ถ้าต้องการรันใหม่ → คลิก Start > Clear > OK
🔥 แค่นี้ก็สามารถตรวจสอบและรัน DAG ใน Airflow ได้ง่ายๆ
ตรวจสอบสถานะและผลลัพธ์ของ Workflow ใน Cloud Console (ฉบับเข้าใจง่าย)
หลังจากที่เรารัน DAG สำเร็จ แล้ว
1. ตรวจสอบไฟล์ที่ถูก Export ใน Cloud Storage
เราจะตรวจสอบว่าไฟล์ข้อมูลถูกคัดลอกจาก US ไปยัง EU แล้วหรือยัง
✔️ วิธีตรวจสอบใน Cloud Storage:
✅ ถ้ามีไฟล์อยู่ครบ แสดงว่าการคัดลอกไฟล์สำเร็จ
2. ตรวจสอบว่าตารางถูก Import ไปยัง BigQuery แล้ว
เมื่อไฟล์ถูกคัดลอกแล้ว BigQuery ควรจะนำเข้าข้อมูลเข้าสู่ Dataset ที่เรากำหนดไว้
✔️ วิธีตรวจสอบใน BigQuery:
✅ ถ้าเห็นตารางใน Dataset แสดงว่าการ Import ข้อมูลสำเร็จ
จบไปแล้วกับ Blog เรื่อง Cloud Composer ที่ยาวมากกกก! ขอบคุณที่อ่านกันจนจบนะคะ หวังว่าทุกคนจะได้ความรู้ไปไม่น้อยเลย แล้วครั้งหน้าจะเป็นเรื่องอะไร อย่าลืมติดตามกันนะคะ รับรองว่าไม่พลาดความรู้ใหม่ๆ แน่นอน เจอกันในบล็อกหน้า! ✨📚