Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Create functional producer for BufferedStorageBackend #5462

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).

## Pending

### New Features
* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform).
* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).

### Stellar Core Protocol 21 Configuration Update:
* BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated.
* A new mandatory parameter, `DEPRECATED_SQL_LEDGER_STATE`, has been added with a default value of false which equivalent to `EXPERIMENTAL_BUCKETLIST_DB` being set to true.
Expand Down
147 changes: 147 additions & 0 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cdp
sreuland marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig {

config := ledgerbackend.BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// PublishFromBufferedStorageBackend - create an internal instance
// of BufferedStorageBackend using provided config and emit
// ledger metadata for the requested range by invoking the provided callback
// once per ledger.
//
// The function is blocking, it will only return when a bounded range
// is completed, the ctx is canceled, or an error occurs.
//
// ledgerRange - the requested range, can be bounded or unbounded.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the internal ledger processing,
// when canceled, the function will return asap with that error.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the processing will stop and return an error asap.
//
// return - error, function only returns if requested range is bounded or an error occured.
// nil will be returned only if bounded range requested and completed processing with no errors.
// otherwise return will always be an error.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}

dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
return fmt.Errorf("failed to create datastore: %w", err)
}

var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
return fmt.Errorf("failed to create buffered storage backend: %w", err)
}

if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
return fmt.Errorf("invalid end value for bounded range, must be greater than start")
}

if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
return fmt.Errorf("invalid end value for unbounded range, must be zero")
}

from := ordered.Max(2, ledgerRange.From())
ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
return fmt.Errorf("error getting ledger, %w", err)
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
return fmt.Errorf("received an error from callback invocation: %w", err)
}
}
return nil
}
Loading
Loading