Spaces:
Sleeping
Sleeping
File size: 3,692 Bytes
3bf8430 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
import random
from typing import Optional
from ...environment import VerifiableEnvironment
class BoundedMeanSubarrayCounting_Environment(VerifiableEnvironment) :
prompt_template = \
r"""Given an array A of length {N}:
{A}
How many nonempty contiguous subarrays have a mean greater than or equal to {K}?
**Output Format:** Your final answer should be a single integer — the total number of nonempty subarrays of A whose mean is greater than or equal to {K}."""
def __init__(self,
wrong_format : float = -1.0, rewarding_strategy : str = "(min/max)^beta", rewarding_weight : float = 1.0, rewarding_beta : float = 10.0,
**kwargs) :
"""
Initialize the BoundedIntervalCounting_Environment instance.
"""
super().__init__(**kwargs)
self.rewards = {
"wrong_format" : wrong_format,
"rewarding_strategy" : rewarding_strategy,
"rewarding_weight" : rewarding_weight,
"rewarding_beta" : rewarding_beta,
}
def _generate(self) -> None :
assert "N" in self.parameter, "N is required in parameter"
N = self.parameter["N"]
assert N >= 2, "N should be greater than or equal to 2"
A = self.parameter["A"] = [random.randint(0, N) for _ in range(N)]
K = self.parameter["K"] = random.randint(min(A), max(A))
v = [0] * (N + 1)
for i in range(1, N + 1) :
v[i] = v[i - 1] + A[i - 1] - K
tmp = [0] * (N + 1)
res = 0
def cdq(l, r) :
nonlocal res
if l >= r :
return
mid = (l + r) // 2
cdq(l, mid)
cdq(mid + 1, r)
i, j = l, mid + 1
sum_left = 0
for k in range(l, r + 1) :
if j > r or (i <= mid and v[i] <= v[j]) :
sum_left += 1
tmp[k] = v[i]
i += 1
else :
res += sum_left
tmp[k] = v[j]
j += 1
for k in range(l, r + 1) :
v[k] = tmp[k]
cdq(0, N)
assert res > 0
self.parameter["reference_answer"] = res
def _prompt_generate(self) -> str :
return self.prompt_template.format(
N = self.parameter["N"],
A = " ".join(map(str, self.parameter["A"])),
K = self.parameter["K"],
)
def _process(self, answer : Optional[str]) -> Optional[int] :
if answer is not None :
answer = answer.strip()
try :
int_answer = int(answer)
return int_answer
except ValueError :
return None
else :
return None
def scorer(self, output : str) -> float :
processed_result = self.processor(output)
if processed_result is not None :
if processed_result < 0 :
return self.rewards["wrong_format"]
if self.rewards["rewarding_strategy"] == "(min/max)^beta" :
a, b = self.parameter["reference_answer"], processed_result
return self.rewards["rewarding_weight"] * (((min(a, b) / max(a, b))) ** self.rewards["rewarding_beta"])
elif self.rewards["rewarding_strategy"] == "gold=answer" :
return self.rewards["rewarding_weight"] * (processed_result == self.parameter["reference_answer"])
else :
raise NotImplementedError("Unknown rewarding strategy: {}".format(self.rewards["rewarding_strategy"]))
else :
return self.rewards["wrong_format"] |