@@ -508,31 +508,37 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
508508 bq_schema_unused = set ()
509509
510510 bq_schema_out = []
511- unknown_type_fields = []
512-
511+ unknown_type_columns = []
512+ dataframe_reset_index = dataframe . reset_index ()
513513 for column , dtype in list_columns_and_indexes (dataframe ):
514- # Use provided type from schema, if present.
514+ # Step 1: use provided type from schema, if present.
515515 bq_field = bq_schema_index .get (column )
516516 if bq_field :
517517 bq_schema_out .append (bq_field )
518518 bq_schema_unused .discard (bq_field .name )
519519 continue
520520
521- # Otherwise, try to automatically determine the type based on the
521+ # Step 2: try to automatically determine the type based on the
522522 # pandas dtype.
523523 bq_type = _PANDAS_DTYPE_TO_BQ .get (dtype .name )
524524 if bq_type is None :
525- sample_data = _first_valid (dataframe . reset_index () [column ])
525+ sample_data = _first_valid (dataframe_reset_index [column ])
526526 if (
527527 isinstance (sample_data , _BaseGeometry )
528528 and sample_data is not None # Paranoia
529529 ):
530530 bq_type = "GEOGRAPHY"
531- bq_field = schema .SchemaField (column , bq_type )
532- bq_schema_out .append (bq_field )
531+ if bq_type is not None :
532+ bq_schema_out .append (schema .SchemaField (column , bq_type ))
533+ continue
534+
535+ # Step 3: try with pyarrow if available
536+ bq_field = _get_schema_by_pyarrow (column , dataframe_reset_index [column ])
537+ if bq_field is not None :
538+ bq_schema_out .append (bq_field )
539+ continue
533540
534- if bq_field .field_type is None :
535- unknown_type_fields .append (bq_field )
541+ unknown_type_columns .append (column )
536542
537543 # Catch any schema mismatch. The developer explicitly asked to serialize a
538544 # column, but it was not found.
@@ -543,98 +549,70 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
543549 )
544550 )
545551
546- # If schema detection was not successful for all columns, also try with
547- # pyarrow, if available.
548- if unknown_type_fields :
549- if not pyarrow :
550- msg = "Could not determine the type of columns: {}" .format (
551- ", " .join (field .name for field in unknown_type_fields )
552- )
553- warnings .warn (msg )
554- return None # We cannot detect the schema in full.
555-
556- # The augment_schema() helper itself will also issue unknown type
557- # warnings if detection still fails for any of the fields.
558- bq_schema_out = augment_schema (dataframe , bq_schema_out )
552+ if unknown_type_columns != []:
553+ msg = "Could not determine the type of columns: {}" .format (
554+ ", " .join (unknown_type_columns )
555+ )
556+ warnings .warn (msg )
557+ return None # We cannot detect the schema in full.
559558
560- return tuple (bq_schema_out ) if bq_schema_out else None
559+ return tuple (bq_schema_out )
561560
562561
563- def augment_schema (dataframe , current_bq_schema ):
564- """Try to deduce the unknown field types and return an improved schema.
562+ def _get_schema_by_pyarrow (name , series ):
563+ """Attempt to detect the type of the given series by leveraging PyArrow's
564+ type detection capabilities.
565565
566- This function requires ``pyarrow`` to run. If all the missing types still
567- cannot be detected, ``None `` is returned. If all types are already known,
568- a shallow copy of the given schema is returned.
566+ This function requires the ``pyarrow`` library to be installed and
567+ available. If the series type cannot be determined or ``pyarrow `` is not
568+ available, ``None`` is returned.
569569
570570 Args:
571- dataframe (pandas.DataFrame):
572- DataFrame for which some of the field types are still unknown.
573- current_bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
574- A BigQuery schema for ``dataframe``. The types of some or all of
575- the fields may be ``None``.
571+ name (str):
572+ the column name of the SchemaField.
573+ series (pandas.Series):
574+ The Series data for which to detect the data type.
576575 Returns:
577- Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]
576+ Optional[google.cloud.bigquery.schema.SchemaField]:
577+ A tuple containing the BigQuery-compatible type string (e.g.,
578+ "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC")
579+ and the mode string ("NULLABLE", "REPEATED").
580+ Returns ``None`` if the type cannot be determined or ``pyarrow``
581+ is not imported.
578582 """
579- # pytype: disable=attribute-error
580- augmented_schema = []
581- unknown_type_fields = []
582- for field in current_bq_schema :
583- if field .field_type is not None :
584- augmented_schema .append (field )
585- continue
586-
587- arrow_table = pyarrow .array (dataframe .reset_index ()[field .name ])
588-
589- if pyarrow .types .is_list (arrow_table .type ):
590- # `pyarrow.ListType`
591- detected_mode = "REPEATED"
592- detected_type = _pyarrow_helpers .arrow_scalar_ids_to_bq (
593- arrow_table .values .type .id
594- )
595-
596- # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
597- # it to such datetimes, causing them to be recognized as TIMESTAMP type.
598- # We thus additionally check the actual data to see if we need to overrule
599- # that and choose DATETIME instead.
600- # Note that this should only be needed for datetime values inside a list,
601- # since scalar datetime values have a proper Pandas dtype that allows
602- # distinguishing between timezone-naive and timezone-aware values before
603- # even requiring the additional schema augment logic in this method.
604- if detected_type == "TIMESTAMP" :
605- valid_item = _first_array_valid (dataframe [field .name ])
606- if isinstance (valid_item , datetime ) and valid_item .tzinfo is None :
607- detected_type = "DATETIME"
608- else :
609- detected_mode = field .mode
610- detected_type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .type .id )
611- if detected_type == "NUMERIC" and arrow_table .type .scale > 9 :
612- detected_type = "BIGNUMERIC"
613583
614- if detected_type is None :
615- unknown_type_fields .append (field )
616- continue
584+ if not pyarrow :
585+ return None
617586
618- new_field = schema .SchemaField (
619- name = field .name ,
620- field_type = detected_type ,
621- mode = detected_mode ,
622- description = field .description ,
623- fields = field .fields ,
624- )
625- augmented_schema .append (new_field )
587+ arrow_table = pyarrow .array (series )
588+ if pyarrow .types .is_list (arrow_table .type ):
589+ # `pyarrow.ListType`
590+ mode = "REPEATED"
591+ type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .values .type .id )
592+
593+ # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
594+ # it to such datetimes, causing them to be recognized as TIMESTAMP type.
595+ # We thus additionally check the actual data to see if we need to overrule
596+ # that and choose DATETIME instead.
597+ # Note that this should only be needed for datetime values inside a list,
598+ # since scalar datetime values have a proper Pandas dtype that allows
599+ # distinguishing between timezone-naive and timezone-aware values before
600+ # even requiring the additional schema augment logic in this method.
601+ if type == "TIMESTAMP" :
602+ valid_item = _first_array_valid (series )
603+ if isinstance (valid_item , datetime ) and valid_item .tzinfo is None :
604+ type = "DATETIME"
605+ else :
606+ mode = "NULLABLE" # default mode
607+ type = _pyarrow_helpers .arrow_scalar_ids_to_bq (arrow_table .type .id )
608+ if type == "NUMERIC" and arrow_table .type .scale > 9 :
609+ type = "BIGNUMERIC"
626610
627- if unknown_type_fields :
628- warnings .warn (
629- "Pyarrow could not determine the type of columns: {}." .format (
630- ", " .join (field .name for field in unknown_type_fields )
631- )
632- )
611+ if type is not None :
612+ return schema .SchemaField (name , type , mode )
613+ else :
633614 return None
634615
635- return augmented_schema
636- # pytype: enable=attribute-error
637-
638616
639617def dataframe_to_arrow (dataframe , bq_schema ):
640618 """Convert pandas dataframe to Arrow table, using BigQuery schema.
0 commit comments