{"library":"pelias-dbclient","title":"Pelias Database Client","description":"`pelias-dbclient` is a core Node.js module within the Pelias geocoder ecosystem, providing a stream-based interface for efficiently bulk-inserting documents into Elasticsearch. Its primary function is to act as a crucial pipeline stage for Pelias import processes, transforming Pelias `Document` objects into Elasticsearch-compatible bulk operations. The current stable version is v3.3.0, with releases occurring on a feature-driven, rather than strict time-based, cadence, though a major version (v3.0.0) was released in March 2024. Key differentiators include its tight integration with the Pelias data model, its focus on streaming for large-scale data ingestion, and its commitment to open-source principles as part of the broader Pelias open-data geocoding project. It leverages the official `elasticsearch` client under the hood and is designed specifically for Node.js environments.","language":"javascript","status":"active","last_verified":"Wed Apr 22","install":{"commands":["npm install pelias-dbclient"],"cli":null},"imports":["const dbclient = require('pelias-dbclient');","const streamFactory = require('pelias-dbclient');\nconst stream = streamFactory();","const Document = require('pelias-model').Document;"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"'use strict';\n\nconst streamify = require('stream-array');\nconst through = require('through2');\nconst Document = require('pelias-model').Document;\nconst dbMapper = require('pelias-model').createDocumentMapperStream;\nconst dbclient = require('pelias-dbclient');\n\nconst elasticsearch = require('elasticsearch');\nconst config = require('pelias-config').generate(); // Ensure pelias-config is properly set up\nconst elasticDeleteQuery = require('elastic-deletebyquery');\n\nconst timestamp = Date.now();\n\n// Simulate an upstream data source\nconst stream = streamify([1, 2, 3, 4, 5])\n  .pipe(through.obj((item, enc, next) => {\n    // Create a Pelias Document for each item\n    const uniqueId = [ 'docType', item ].join(':');\n    const doc = new Document( 'sourceType', 'venue', uniqueId );\n    doc.timestamp = timestamp;\n    doc.setName('default', `Test Venue ${item}`);\n    doc.setCentroid(item * 0.1, item * 0.2);\n    next(null, doc);\n  }))\n  .pipe(dbMapper()) // Map Pelias Document to Elasticsearch format\n  .pipe(dbclient()); // Bulk-insert documents into Elasticsearch\n\nstream.on('finish', () => {\n  console.log('All documents processed and sent to Elasticsearch.');\n  const client = new elasticsearch.Client(config.esclient);\n  // Example of a post-import operation: clean up old documents\n  const options = {\n    index: config.schema.indexName,\n    body: {\n      query: {\n        \"bool\": {\n          \"must\": [\n            {\"term\": { \"source\":  \"sourceType\" }}\n          ],\n          \"must_not\": [\n            {\"term\": { \"timestamp\":  timestamp }}\n          ]\n        }\n      }\n    }\n  };\n\n  client.deleteByQuery(options, (err, response) => {\n    if (err) {\n      console.error('Error during cleanup:', err);\n    } else {\n      console.log(`Cleaned up ${response.elements || response.deleted} old elements.`);\n    }\n    client.close();\n  });\n});\n\nstream.on('error', (err) => {\n  console.error('Stream encountered an error:', err);\n});","lang":"javascript","description":"This quickstart demonstrates how to use `pelias-dbclient` as a transform stream to bulk-insert Pelias Document objects into Elasticsearch, followed by a cleanup operation upon stream completion. It shows the typical integration within a Pelias import pipeline.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}