Step 5: Incremental Reads
We now have a working implementation of a connector reading the latest exchange rates for a given currency. In this section, we'll update the source to read historical data instead of only reading the latest exchange rates.
According to the API documentation, we can read the exchange rate for a specific date by querying the "/exchangerates_data/{date}" endpoint instead of "/exchangerates_data/latest".
We'll now add a start_date property to the connector.
First we'll update the spec block in source_exchange_rates_tutorial/manifest.yaml
spec: 
  documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
  connection_specification:
    $schema: http://json-schema.org/draft-07/schema#
    title: exchangeratesapi.io Source Spec
    type: object
    required:
      - start_date
      - access_key
      - base
    additionalProperties: true
    properties:
      start_date:
        type: string
        description: Start getting data from that date.
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
        examples:
          - YYYY-MM-DD
      access_key:
        type: string
        description: >-
          Your API Access Key. See <a
          href="https://exchangeratesapi.io/documentation/">here</a>. The key is
          case sensitive.
        airbyte_secret: true
      base:
        type: string
        description: >-
          ISO reference currency. See <a
          href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
        examples:
          - EUR
          - USD
Then we'll set the start_date to last week in our connection config in secrets/config.json.
Let's add a start_date field to secrets/config.json.
The file should look like
{
  "access_key": "<your_access_key>",
  "start_date": "2022-07-26",
  "base": "USD"
}
where the start date should be 7 days in the past.
And we'll update the path in the connector definition to point to /{{ config.start_date }}.
Note that we are setting a default value because the check operation does not know the start_date. We'll default to hitting /exchangerates_data/latest:
definitions:
  <...>
  rates_stream:
    $ref: "#/definitions/base_stream"
    $parameters:
      name: "rates"
      primary_key: "date"
      path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
You can test these changes by executing the read operation:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
By reading the output record, you should see that we read historical data instead of the latest exchange rate. For example:
"historical": true, "base": "USD", "date": "2022-07-18"
The connector will now always read data for the start date, which is not exactly what we want.
Instead, we would like to iterate over all the dates between the start_date and today and read data for each day.
We can do this by adding a DatetimeBasedCursor to the connector definition, and update the path to point to the stream_slice's start_date:
More details on incremental syncs can be found here.
Let's first define a datetime cursor at the top level of the connector definition:
definitions:
  datetime_cursor:
    type: "DatetimeBasedCursor"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%d"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "P1D"
    datetime_format: "%Y-%m-%d"
    cursor_granularity: "P1D"
    cursor_field: "date"
and refer to it in the stream.
This will generate time windows from the start time until the end time, where each window is exactly one day.
The start time is defined in the config file, while the end time is defined by the now_utc() macro, which will evaluate to the current date in the current timezone at runtime. See the section on string interpolation for more details.
definitions:
  <...>
  rates_stream:
    $ref: "#/definitions/base_stream"
    $parameters:
      name: "rates"
      primary_key: "date"
      path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
We'll also update the base stream to use the datetime cursor:
definitions:
  <...>
  base_stream:
    <...>
    incremental_sync:
      $ref: "#/definitions/datetime_cursor"
Finally, we'll update the path to point to the stream_slice's start_time
definitions:
  <...>
  rates_stream:
    $ref: "#/definitions/base_stream"
    $parameters:
      name: "rates"
      primary_key: "date"
      path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
The full connector definition should now look like ./source_exchange_rates_tutorial/manifest.yaml:
version: "0.1.0"
definitions:
  selector:
    extractor:
      field_path: [ ]
  requester:
    url_base: "https://api.apilayer.com"
    http_method: "GET"
    authenticator:
      type: ApiKeyAuthenticator
      header: "apikey"
      api_token: "{{ config['access_key'] }}"
    request_options_provider:
      request_parameters:
        base: "{{ config['base'] }}"
  datetime_cursor:
    type: "DatetimeBasedCursor"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%d"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "P1D"
    datetime_format: "%Y-%m-%d"
    cursor_granularity: "P1D"
    cursor_field: "date"
  retriever:
    record_selector:
      $ref: "#/definitions/selector"
    paginator:
      type: NoPagination
    requester:
      $ref: "#/definitions/requester"
  base_stream:
    incremental_sync:
      $ref: "#/definitions/datetime_cursor"
    retriever:
      $ref: "#/definitions/retriever"
  rates_stream:
    $ref: "#/definitions/base_stream"
    $parameters:
      name: "rates"
      primary_key: "date"
      path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
streams:
  - "#/definitions/rates_stream"
check:
  stream_names:
    - "rates"
spec: 
  documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
  connection_specification:
    $schema: http://json-schema.org/draft-07/schema#
    title: exchangeratesapi.io Source Spec
    type: object
    required:
      - start_date
      - access_key
      - base
    additionalProperties: true
    properties:
      start_date:
        type: string
        description: Start getting data from that date.
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
        examples:
          - YYYY-MM-DD
      access_key:
        type: string
        description: >-
          Your API Access Key. See <a
          href="https://exchangeratesapi.io/documentation/">here</a>. The key is
          case sensitive.
        airbyte_secret: true
      base:
        type: string
        description: >-
          ISO reference currency. See <a
          href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
        examples:
          - EUR
          - USD
Running the read operation will now read all data for all days between start_date and now:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
The operation should now output more than one record:
{"type": "LOG", "log": {"level": "INFO", "message": "Read 8 records from rates stream"}}
Supporting incremental syncs
Instead of always reading data for all dates, we would like the connector to only read data for dates we haven't read yet.
This can be achieved by updating the catalog to run in incremental mode (integration_tests/configured_catalog.json):
{
  "streams": [
    {
      "stream": {
        "name": "rates",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh",
          "incremental"
        ]
      },
      "sync_mode": "incremental",
      "destination_sync_mode": "overwrite"
    }
  ]
}
In addition to records, the read operation now also outputs state messages:
{"type": "STATE", "state": {"data": {"rates": {"date": "2022-07-15"}}}}
Where the date ("2022-07-15") should be replaced by today's date.
We can simulate incremental syncs by creating a state file containing the last state produced by the read operation.
source-exchange-rates-tutorial/integration_tests/sample_state.json:
{
  "rates": {
    "date": "2022-07-15"
  }
}
Running the read operation will now only read data for dates later than the given state:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state integration_tests/sample_state.json
There shouldn't be any data read if the state is today's date:
{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of rates stream to {'date': '2022-07-15'}"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from rates stream"}}
Next steps:
Next, we'll run the Connector Acceptance Tests suite to ensure the connector invariants are respected.