Google Dataflow: get job name and start time from the running pipeline itself

By : Ou Zza
Date : October 16 2020, 06:10 AM
I hope this helps you . It can be done, just a bit tricky in the current implementation of the template feature.
For job id, you can follow this code snippet: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java#L178
code :

Running Google Dataflow pipeline from a Google App Engine app?

By : Mark Brend
Date : March 29 2020, 07:55 AM
Hope that helps The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.
The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.
code :
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ

Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?

By : neeljain
Date : March 29 2020, 07:55 AM
Does that help I am trying to execute a dataflow pipeline job which would execute one function on N entries at a time from datastore. In my case this function is sending batch of 100 entries to some REST service as payload. This means that I want to go through all entries from one datastore entity, and send 100 batched entries at once to some outside REST service. , You can batch these elements up within your DoFn. For example:
code :
final int BATCH_SIZE = 100;

  // 1. Read input from datastore  
  .apply(DatastoreIO.readFrom(datasetId, query))

  // 2. Programatically batch users
  .apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {

    private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);

    public void processElement(ProcessContext c) throws Exception {
      EntryPOJO entry = processEntity(c);
      if (accumulator.size() >= BATCH_SIZE) {
        accumulator = new ArrayList<>(BATCH_SIZE);

    public void finishBundle(Context c) throws Exception {
      if (accumulator.size() > 0) {

  // 3. Consume those bundles
  .apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
    public void processElement(ProcessContext c) throws Exception {

"ClassNotFoundException: sun.security.provider.Sun" when running Google Cloud Dataflow pipeline in Google App

By : Cloud Entreprises
Date : March 29 2020, 07:55 AM
should help you out The problem is that sun.security.provider.Sun doesn't appear on the App Engine JRE whitelist, so the classloader can't instantiate instances of it:
code :
class Example implements Serializable {

  // See comments on {@link #writeObject} for why this is transient.
  // Should be treated as final, but can't be declared as such.
  private transient Random random;

  // [Guts of the class go here...]

   * Serialization hook to handle the transient Random field.
  private void writeObject(ObjectOutputStream out) throws IOException {
    if (random instanceof SecureRandom) {
      // Write a null to tell readObject() to create a new
      // SecureRandom during deserialization; null is safe to use
      // as a placeholder because the constructor disallows null
      // Randoms.
      // The dataflow cloud environment won't deserialize
      // SecureRandom instances that use sun.security.provider.Sun
      // as their Provider, because it's a system
      // class that's not on the App Engine whitelist:
      // https://cloud.google.com/appengine/docs/java/jrewhitelist
    } else {

   * Deserialization hook to initialize the transient Random field.
  private void readObject(ObjectInputStream in)
      throws IOException, ClassNotFoundException {
    Object newRandom = in.readObject();
    if (newRandom == null) {
      // writeObject() will write a null if the original field was
      // SecureRandom; create a new instance to replace it. See
      // comments in writeObject() for background.
      random = new SecureRandom();
      random.nextDouble(); // force seeding
    } else {
      random = (Random) newRandom;

How to sandbox/limit access for a Google Cloud Dataflow pipeline running in the cloud?

By : Deborah Díaz
Date : March 29 2020, 07:55 AM
To fix this issue First of all, the user codes running in DF's worker VMs bear Compute Engine Service Account credentials by default, which is unrelated to who launched the job from where.
So basically your question can be reinterpreted as:

Beam pipeline not moving in Google Dataflow while running ok on direct runner

By : user2768949
Date : March 29 2020, 07:55 AM
I hope this helps . Dig around and found the full log by clicking on the stackdrive on the upper right corner of the log shown. Found the following
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:54) ....
