"""Init DAG Create table and partitions for import_wikidata and import_commons dag Import Commons TTL Parse and munge commons TTL dumps - Entities dump start on sundays and takes a day - These dumps are copied to a cloud replica - The refinery job runs hdfs_rsync every days at 2:30 am (see puppet import_commons_mediainfo_dumps.pp) Import Wikidata TTL Parse and munge wikidata TTL dumps - Entities dump start on mondays and takes a couple days - Lexemes starts on saturdays - These dumps are copied to a cloud replica - The refinery job runs hdfs_rsync every days around 1am and 3am (see puppet import_wikidata_entities_dumps.pp) """ from datetime import datetime, timedelta from typing import Mapping import pendulum from airflow.operators.empty import EmptyOperator from airflow.providers.apache.hive.operators.hive import HiveOperator from search.config.dag_config import data_path, wdqs_spark_tools, create_easy_dag from wmf_airflow_common.config.dag_properties import DagProperties from wmf_airflow_common.operators.spark import SparkSubmitOperator from wmf_airflow_common.sensors.url import URLSensor props = DagProperties( commons_dump_dir="hdfs://analytics-hadoop/wmf/data/raw/commons/dumps/mediainfo-ttl", all_ttl_dump_dir="hdfs://analytics-hadoop/wmf/data/raw/wikidata/dumps/all_ttl", lexemes_ttl_dump_dir="hdfs://analytics-hadoop/wmf/data/raw/wikidata/dumps/lexemes_ttl", rdf_data_table="discovery.wikibase_rdf", rdf_data_location="wikidata/rdf/", rdf_data_subgraphs_table="discovery.wikibase_rdf_subgraphs", rdf_data_subgraphs_location="wikidata/rdf_subgraphs/", munged_n3_dump_location="wikidata/munged_n3_dump/", entity_revision_map_wdqs=f'{data_path}/wdqs/entity_revision_map', entity_revision_map_wcqs=f'{data_path}/wcqs/entity_revision_map' ) def export_n3_data(input_part: str, output_path: str, filename_format: str, nb_partitions: int, task_id: str, conf: Mapping[str, str]) -> SparkSubmitOperator: """ :param input_part: input partition (rdf triple table) :param output_path: output hdfs path :param filename_format: format of filename, must include a %d placeholder for the partition index :param nb_partitions: number of partitions (resulting files) :param task_id: task id :param conf: default spark configuration :return: the corresponding SparkSubmitOperator """ return SparkSubmitOperator( task_id=task_id, application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.NTripleGenerator", max_executors=32, executor_cores=4, executor_memory="8g", executor_memory_overhead="4g", driver_memory="8g", driver_cores=2, sql_shuffle_partitions=512, conf={ **conf, # As noted above, our HDFS defaults do not allow world-read. # This dataset contains only public data, so we make it readable by all. 'spark.hadoop.fs.permissions.umask-mode': '0022', }, application_args=[ '--input-table-partition-spec', input_part, '--output-hdfs-path', output_path, '--num-partitions', nb_partitions, '--filename-format', filename_format ] ) with create_easy_dag( 'import_ttl_init', start_date=datetime(2023, 1, 30), schedule='@once', tags=["manual", "query_service"] ) as dag_init: complete = EmptyOperator(task_id='complete') HiveOperator( task_id='create_tables', hql=f""" CREATE TABLE IF NOT EXISTS {props.rdf_data_table} ( `context` string, `subject` string, `predicate` string, `object` string ) PARTITIONED BY ( `date` string, `wiki` string ) STORED AS PARQUET LOCATION '{data_path}/{props.rdf_data_location}'; CREATE TABLE IF NOT EXISTS {props.rdf_data_subgraphs_table} ( `subject` string, `predicate` string, `object` string, `context` string ) PARTITIONED BY ( `snapshot` string, `wiki` string, `scope` string ) STORED AS PARQUET LOCATION '{data_path}/{props.rdf_data_subgraphs_location}' ; """ ) >> complete with create_easy_dag( 'import_commons_ttl', start_date=datetime(2023, 1, 30), # commons ttl is scheduled on sundays # The dump arrives on cloud replica on monday # picked-up by the hdfs_rsync running on tuesday morning # Start the job on wednesdays 3am schedule='0 3 * * 3', # As a weekly job there should never really be more than # one running at a time. max_active_runs=1, catchup=False, user_defined_macros={ 'p': pendulum, }, tags=["weekly", "query_service", "commons"] ) as commons_dag: commons_ds = "{{ data_interval_start.next(day_of_week=p.SUNDAY) | ds_nodash }}" path = f"{props.commons_dump_dir}/{commons_ds}/_IMPORTED" wiki = "commons" rdf_table_and_partition = f'{props.rdf_data_table}/date={commons_ds}/wiki={wiki}' commons_sensor = URLSensor(task_id="wait_for_mediainfo_ttl_dump", url=path, poke_interval=timedelta(hours=1).total_seconds(), timeout=timedelta(days=1).total_seconds()) input_path = f"{props.commons_dump_dir}/{commons_ds}/commons-{commons_ds}-mediainfo.ttl.bz2" munge_and_import_commons_dumps = SparkSubmitOperator( task_id='munge_dumps', application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.WikibaseRDFDumpConverter", # noqa max_executors=25, executor_cores=8, executor_memory="16g", driver_memory="2g", conf={ **commons_dag.default_args.get('conf'), # Our hdfs defaults do not allow world-read. This dataset contains # only public data, so we make it readable by all 'spark.hadoop.fs.permissions.umask-mode': '0022' }, application_args=[ '--input-path', input_path, '--output-table', rdf_table_and_partition, '--skolemize', '--site', wiki, ] ) generate_entity_rev_map = SparkSubmitOperator( task_id='gen_rev_map', application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.EntityRevisionMapGenerator", # noqa max_executors=25, executor_cores=8, executor_memory="16g", driver_memory="2g", application_args=[ '--input-table', rdf_table_and_partition, '--output-path', f"{props.entity_revision_map_wcqs}/{commons_ds}/rev_map.csv", '--uris-scheme', 'commons', '--hostname', 'commons.wikimedia.org', ] ) end = EmptyOperator(task_id='complete') end << generate_entity_rev_map << munge_and_import_commons_dumps << commons_sensor with create_easy_dag( 'import_wikidata_ttl', start_date=datetime(2023, 1, 30), # all ttl is scheduled on mondays and lexeme on fridays # The all_ttl dump arrives on cloud replica on thursdays morning (5am - 7am) # It'll be picked-up by the hdfs_rsync running on fridays morning # Start the job on fridays 3am # we'll probably wait around ~5hours on the hdfs sensor schedule='0 3 * * 5', # As a weekly job there should never really be more than # one running at a time. max_active_runs=1, catchup=True, user_defined_macros={ 'p': pendulum, }, tags=["weekly", "query_service", "wikidata"] ) as wikidata_dag: # we have weekly runs and airflow schedules job just after the end of the period # an exec date on Fri Jun 5th actually means we run just after Thu Jun 12 23:59 # but we want to wait for the dumps generated on Mon Jun 8 all_ttl_ds = "{{ data_interval_start.next(day_of_week=p.MONDAY) | ds_nodash }}" lexemes_ttl_ds = "{{ data_interval_start | ds_nodash }}" path = f"{props.all_ttl_dump_dir}/{all_ttl_ds}/_IMPORTED" wiki = "wikidata" rdf_table_and_partition = f'{props.rdf_data_table}/date={all_ttl_ds}/wiki={wiki}' rdf_subgraphs_table_and_partition = f'{props.rdf_data_subgraphs_table}/snapshot={all_ttl_ds}/wiki={wiki}' all_ttl_sensor = URLSensor(task_id="wait_for_all_ttl_dump", url=path, poke_interval=timedelta(hours=1).total_seconds(), timeout=timedelta(days=1).total_seconds()) path = f"{props.lexemes_ttl_dump_dir}/{lexemes_ttl_ds}/_IMPORTED" lexeme_ttl_sensor = URLSensor(task_id="wait_for_lexemes_ttl_dump", url=path, poke_interval=timedelta(hours=1).total_seconds(), timeout=timedelta(days=1).total_seconds()) input_path = "{all_base}/{all_ttl_ds}/wikidata-{all_ttl_ds}-all-BETA.ttl.bz2," \ "{lexemes_base}/{lexemes_ttl_ds}/" \ "wikidata-{lexemes_ttl_ds}-lexemes-BETA.ttl.bz2".format( all_base=props.all_ttl_dump_dir, all_ttl_ds=all_ttl_ds, lexemes_base=props.lexemes_ttl_dump_dir, lexemes_ttl_ds=lexemes_ttl_ds ) site = "wikidata" munge_and_import_dumps = SparkSubmitOperator( task_id='munge_dumps', application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.WikibaseRDFDumpConverter", # noqa max_executors=25, executor_cores=8, executor_memory="16g", driver_memory="2g", conf={ **wikidata_dag.default_args.get('conf'), # Our hdfs defaults do not allow world-read. This dataset contains # only public data, so we make it readable by all 'spark.hadoop.fs.permissions.umask-mode': '0022' }, application_args=[ '--input-path', input_path, '--output-table', rdf_table_and_partition, '--skolemize', '--site', wiki, ] ) generate_entity_rev_map = SparkSubmitOperator( task_id='gen_rev_map', application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.EntityRevisionMapGenerator", # noqa max_executors=25, executor_cores=8, executor_memory="16g", driver_memory="2g", application_args=[ '--input-table', rdf_table_and_partition, '--output-path', f"{props.entity_revision_map_wdqs}/{all_ttl_ds}/rev_map.csv", ] ) split_subgraphs = SparkSubmitOperator( task_id='split_subgraphs', pool="sequential", application=wdqs_spark_tools, java_class="org.wikidata.query.rdf.spark.transform.structureddata.dumps.ScholarlyArticleSplit", # noqa max_executors=128, executor_cores=4, executor_memory="12g", executor_memory_overhead="4g", driver_memory="16g", driver_cores=2, sql_shuffle_partitions=512, conf={ **wikidata_dag.default_args.get('conf'), # As noted above, our HDFS defaults do not allow world-read. # This dataset contains only public data, so we make it readable by all. 'spark.hadoop.fs.permissions.umask-mode': '0022', # This runs better without Spark being eager to broadcast join. # (Although even with this setting it may attempt some.) 'spark.sql.autoBroadcastJoinThreshold': -1 }, application_args=[ '--input-table-partition-spec', rdf_table_and_partition, '--output-table-partition-spec', rdf_subgraphs_table_and_partition, ] ) export_n3_data_tasks = [ export_n3_data(rdf_table_and_partition, f"{data_path}/{props.munged_n3_dump_location}/wikidata/full/{all_ttl_ds}", "wikidata_full.%04d.nt.gz", 2048, "export_n3_full", conf=wikidata_dag.default_args.get("conf")), export_n3_data(f"{rdf_subgraphs_table_and_partition}/scope=wikidata_main", f"{data_path}/{props.munged_n3_dump_location}/wikidata/main/{all_ttl_ds}", "wikidata_main.%04d.nt.gz", 1024, "export_n3_main", conf=wikidata_dag.default_args.get("conf")), export_n3_data(f"{rdf_subgraphs_table_and_partition}/scope=scholarly_articles", f"{data_path}/{props.munged_n3_dump_location}/wikidata/scholarly/{all_ttl_ds}", "scholarly_articles.%04d.nt.gz", 1024, "export_n3_scholarly", conf=wikidata_dag.default_args.get("conf")), ] end = EmptyOperator(task_id='complete') all_ttl_sensor >> munge_and_import_dumps lexeme_ttl_sensor >> munge_and_import_dumps munge_and_import_dumps >> generate_entity_rev_map >> end munge_and_import_dumps >> split_subgraphs for n3_export_task in export_n3_data_tasks: split_subgraphs >> n3_export_task >> end