Pelias Database Client

3.3.0 · active · verified Wed Apr 22

`pelias-dbclient` is a core Node.js module within the Pelias geocoder ecosystem, providing a stream-based interface for efficiently bulk-inserting documents into Elasticsearch. Its primary function is to act as a crucial pipeline stage for Pelias import processes, transforming Pelias `Document` objects into Elasticsearch-compatible bulk operations. The current stable version is v3.3.0, with releases occurring on a feature-driven, rather than strict time-based, cadence, though a major version (v3.0.0) was released in March 2024. Key differentiators include its tight integration with the Pelias data model, its focus on streaming for large-scale data ingestion, and its commitment to open-source principles as part of the broader Pelias open-data geocoding project. It leverages the official `elasticsearch` client under the hood and is designed specifically for Node.js environments.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `pelias-dbclient` as a transform stream to bulk-insert Pelias Document objects into Elasticsearch, followed by a cleanup operation upon stream completion. It shows the typical integration within a Pelias import pipeline.

'use strict';

const streamify = require('stream-array');
const through = require('through2');
const Document = require('pelias-model').Document;
const dbMapper = require('pelias-model').createDocumentMapperStream;
const dbclient = require('pelias-dbclient');

const elasticsearch = require('elasticsearch');
const config = require('pelias-config').generate(); // Ensure pelias-config is properly set up
const elasticDeleteQuery = require('elastic-deletebyquery');

const timestamp = Date.now();

// Simulate an upstream data source
const stream = streamify([1, 2, 3, 4, 5])
  .pipe(through.obj((item, enc, next) => {
    // Create a Pelias Document for each item
    const uniqueId = [ 'docType', item ].join(':');
    const doc = new Document( 'sourceType', 'venue', uniqueId );
    doc.timestamp = timestamp;
    doc.setName('default', `Test Venue ${item}`);
    doc.setCentroid(item * 0.1, item * 0.2);
    next(null, doc);
  }))
  .pipe(dbMapper()) // Map Pelias Document to Elasticsearch format
  .pipe(dbclient()); // Bulk-insert documents into Elasticsearch

stream.on('finish', () => {
  console.log('All documents processed and sent to Elasticsearch.');
  const client = new elasticsearch.Client(config.esclient);
  // Example of a post-import operation: clean up old documents
  const options = {
    index: config.schema.indexName,
    body: {
      query: {
        "bool": {
          "must": [
            {"term": { "source":  "sourceType" }}
          ],
          "must_not": [
            {"term": { "timestamp":  timestamp }}
          ]
        }
      }
    }
  };

  client.deleteByQuery(options, (err, response) => {
    if (err) {
      console.error('Error during cleanup:', err);
    } else {
      console.log(`Cleaned up ${response.elements || response.deleted} old elements.`);
    }
    client.close();
  });
});

stream.on('error', (err) => {
  console.error('Stream encountered an error:', err);
});

view raw JSON →