Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7567

StateSpec should contain the state id

Details

    Description

      In the Java SDK we currently ask users to define a state spec as:

      new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {
      
        // A state cell holding a single Integer per key+window
        @StateId("index")
        private final StateSpec<ValueState<Integer>> indexSpec =
            StateSpecs.value(VarIntCoder.of());
      
        @ProcessElement
        public void processElement(
            ProcessContext context,
            @StateId("index") ValueState<Integer> index) {
          int current = firstNonNull(index.read(), 0);
          context.output(KV.of(current, context.element()));
          index.write(current+1);
        }
      }
      

      The suggestion is to move the @StateId into the StateSpec so the could would look like:

      new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {
      
        // A state cell holding a single Integer per key+window
        private final StateSpec<ValueState<Integer>> indexSpec =
            StateSpecs.value("index", VarIntCoder.of());
      
        @ProcessElement
        public void processElement(
            ProcessContext context,
            @StateId("index") ValueState<Integer> index) {
          int current = firstNonNull(index.read(), 0);
          context.output(KV.of(current, context.element()));
          index.write(current+1);
        }
      }
      

      We should also remove the coder from the StateSpec hashCode and equals method.

      Attachments

        Activity

          People

            Unassigned Unassigned
            lcwik Luke Cwik
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: